You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2011/08/02 01:42:17 UTC
svn commit: r1152970 [23/26] - in /incubator/kafka: branches/ site/ trunk/
trunk/bin/ trunk/clients/ trunk/clients/clojure/
trunk/clients/clojure/leiningen/ trunk/clients/clojure/resources/
trunk/clients/clojure/src/ trunk/clients/clojure/src/kafka/ tr...
Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.message
+
+import java.util.Arrays
+import junit.framework.Assert._
+import kafka.utils.TestUtils._
+import org.scalatest.junit.JUnitSuite
+import org.junit.Test
+
+trait BaseMessageSetTestCases extends JUnitSuite {
+
+ val messages = Array(new Message("abcd".getBytes()), new Message("efgh".getBytes()))
+
+ def createMessageSet(messages: Seq[Message]): MessageSet
+
+ @Test
+ def testWrittenEqualsRead {
+ val messageSet = createMessageSet(messages)
+ checkEquals(messages.iterator, messageSet.map(m => m.message).iterator)
+ }
+
+ @Test
+ def testIteratorIsConsistent() {
+ val m = createMessageSet(messages)
+ // two iterators over the same set should give the same results
+ checkEquals(m.iterator, m.iterator)
+ }
+
+ @Test
+ def testSizeInBytes() {
+ assertEquals("Empty message set should have 0 bytes.",
+ 0L,
+ createMessageSet(Array[Message]()).sizeInBytes)
+ assertEquals("Predicted size should equal actual size.",
+ MessageSet.messageSetSize(messages).toLong,
+ createMessageSet(messages).sizeInBytes)
+ }
+
+ @Test
+ def testWriteTo() {
+ // test empty message set
+ testWriteToWithMessageSet(createMessageSet(Array[Message]()))
+ testWriteToWithMessageSet(createMessageSet(messages))
+ }
+
+ def testWriteToWithMessageSet(set: MessageSet) {
+ val channel = tempChannel()
+ val written = set.writeTo(channel, 0, 1024)
+ assertEquals("Expect to write the number of bytes in the set.", set.sizeInBytes, written)
+ val newSet = new FileMessageSet(channel, false)
+ checkEquals(set.iterator, newSet.iterator)
+ }
+
+}
Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.message
+
+import java.nio._
+import junit.framework.Assert._
+import org.junit.Test
+
+class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
+
+ override def createMessageSet(messages: Seq[Message]): ByteBufferMessageSet =
+ new ByteBufferMessageSet(NoCompressionCodec, messages: _*)
+
+ @Test
+ def testValidBytes() {
+ val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message("hello".getBytes()), new Message("there".getBytes()))
+ val buffer = ByteBuffer.allocate(messages.sizeInBytes.toInt + 2)
+ buffer.put(messages.serialized)
+ buffer.putShort(4)
+ val messagesPlus = new ByteBufferMessageSet(buffer)
+ assertEquals("Adding invalid bytes shouldn't change byte count", messages.validBytes, messagesPlus.validBytes)
+ }
+
+ @Test
+ def testEquals() {
+ var messages = new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes()), new Message("there".getBytes()))
+ var moreMessages = new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes()), new Message("there".getBytes()))
+
+ assertTrue(messages.equals(moreMessages))
+
+ messages = new ByteBufferMessageSet(NoCompressionCodec, new Message("hello".getBytes()), new Message("there".getBytes()))
+ moreMessages = new ByteBufferMessageSet(NoCompressionCodec, new Message("hello".getBytes()), new Message("there".getBytes()))
+
+ assertTrue(messages.equals(moreMessages))
+ }
+
+}
Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/CompressionUtilsTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/CompressionUtilsTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/CompressionUtilsTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/CompressionUtilsTest.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,41 @@
+package kafka.message
+
+import junit.framework.TestCase
+import kafka.utils.TestUtils
+
+class CompressionUtilTest extends TestCase {
+
+
+
+ def testSimpleCompressDecompress() {
+
+ val messages = List[Message](new Message("hi there".getBytes), new Message("I am fine".getBytes), new Message("I am not so well today".getBytes))
+
+ val message = CompressionUtils.compress(messages)
+
+ val decompressedMessages = CompressionUtils.decompress(message)
+
+ TestUtils.checkLength(decompressedMessages.iterator,3)
+
+ TestUtils.checkEquals(messages.iterator, decompressedMessages.iterator)
+ }
+
+ def testComplexCompressDecompress() {
+
+ val messages = List[Message](new Message("hi there".getBytes), new Message("I am fine".getBytes), new Message("I am not so well today".getBytes))
+
+ val message = CompressionUtils.compress(messages.slice(0, 2))
+
+ val complexMessages = List[Message](message):::messages.slice(2,3)
+
+ val complexMessage = CompressionUtils.compress(complexMessages)
+
+ val decompressedMessages = CompressionUtils.decompress(complexMessage)
+
+ TestUtils.checkLength(decompressedMessages.iterator,2)
+
+ TestUtils.checkLength(decompressedMessages.iterator,3)
+
+ TestUtils.checkEquals(messages.iterator, decompressedMessages.iterator)
+ }
+}
Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/FileMessageSetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/FileMessageSetTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/FileMessageSetTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/FileMessageSetTest.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.message
+
+import java.nio._
+import java.util.Arrays
+import junit.framework.TestCase
+import junit.framework.Assert._
+import kafka.utils.TestUtils._
+import org.junit.Test
+import kafka.message._
+
+class FileMessageSetTest extends BaseMessageSetTestCases {
+
+ val messageSet = createMessageSet(messages)
+
+ def createMessageSet(messages: Seq[Message]): FileMessageSet = {
+ val set = new FileMessageSet(tempFile(), true)
+ set.append(new ByteBufferMessageSet(NoCompressionCodec, messages: _*))
+ set.flush()
+ set
+ }
+
+ @Test
+ def testFileSize() {
+ assertEquals(messageSet.channel.size, messageSet.sizeInBytes)
+ messageSet.append(singleMessageSet("abcd".getBytes()))
+ assertEquals(messageSet.channel.size, messageSet.sizeInBytes)
+ }
+
+ @Test
+ def testIterationOverPartialAndTruncation() {
+ testPartialWrite(0, messageSet)
+ testPartialWrite(2, messageSet)
+ testPartialWrite(4, messageSet)
+ testPartialWrite(5, messageSet)
+ testPartialWrite(6, messageSet)
+ }
+
+ def testPartialWrite(size: Int, messageSet: FileMessageSet) {
+ val buffer = ByteBuffer.allocate(size)
+ val originalPosition = messageSet.channel.position
+ for(i <- 0 until size)
+ buffer.put(0.asInstanceOf[Byte])
+ buffer.rewind()
+ messageSet.channel.write(buffer)
+ // appending those bytes should not change the contents
+ checkEquals(messages.iterator, messageSet.map(m => m.message).iterator)
+ assertEquals("Unexpected number of bytes truncated", size.longValue, messageSet.recover())
+ assertEquals("File pointer should now be at the end of the file.", originalPosition, messageSet.channel.position)
+ // nor should recovery change the contents
+ checkEquals(messages.iterator, messageSet.map(m => m.message).iterator)
+ }
+
+ @Test
+ def testIterationDoesntChangePosition() {
+ val position = messageSet.channel.position
+ checkEquals(messages.iterator, messageSet.map(m => m.message).iterator)
+ assertEquals(position, messageSet.channel.position)
+ }
+
+ @Test
+ def testRead() {
+ val read = messageSet.read(0, messageSet.sizeInBytes)
+ checkEquals(messageSet.iterator, read.iterator)
+ val items = read.iterator.toList
+ val first = items.head
+ val read2 = messageSet.read(first.offset, messageSet.sizeInBytes)
+ checkEquals(items.tail.iterator, read2.iterator)
+ }
+
+}
Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/MessageTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/MessageTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/MessageTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/message/MessageTest.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.message
+
+import java.util._
+import java.nio._
+import java.nio.channels._
+import java.io._
+import junit.framework.TestCase
+import junit.framework.Assert._
+import org.scalatest.junit.JUnitSuite
+import org.junit.{Before, Test}
+import kafka.utils.TestUtils
+
+class MessageTest extends JUnitSuite {
+
+ var message: Message = null
+ val payload = "some bytes".getBytes()
+
+ @Before
+ def setUp(): Unit = {
+ message = new Message(payload)
+ }
+
+ @Test
+ def testFieldValues = {
+ TestUtils.checkEquals(ByteBuffer.wrap(payload), message.payload)
+ assertEquals(Message.CurrentMagicValue, message.magic)
+ assertEquals(69L, new Message(69, "hello".getBytes()).checksum)
+ }
+
+ @Test
+ def testChecksum() {
+ assertTrue("Auto-computed checksum should be valid", message.isValid)
+ val badChecksum = message.checksum + 1 % Int.MaxValue
+ val invalid = new Message(badChecksum, payload)
+ assertEquals("Message should return written checksum", badChecksum, invalid.checksum)
+ assertFalse("Message with invalid checksum should be invalid", invalid.isValid)
+ }
+
+ @Test
+ def testEquality() = {
+ assertFalse("Should not equal null", message.equals(null))
+ assertFalse("Should not equal a random string", message.equals("asdf"))
+ assertTrue("Should equal itself", message.equals(message))
+ val copy = new Message(message.checksum, payload)
+ assertTrue("Should equal another message with the same content.", message.equals(copy))
+ }
+
+ @Test
+ def testIsHashable() = {
+ // this is silly, but why not
+ val m = new HashMap[Message,Boolean]()
+ m.put(message, true)
+ assertNotNull(m.get(message))
+ }
+
+}
Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,268 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.producer
+
+import junit.framework.{Assert, TestCase}
+import java.util.Properties
+import org.easymock.EasyMock
+import kafka.api.ProducerRequest
+import org.apache.log4j.{Logger, Level}
+import org.junit.Test
+import org.scalatest.junit.JUnitSuite
+import kafka.producer.async._
+import kafka.serializer.Encoder
+import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
+
+class AsyncProducerTest extends JUnitSuite {
+
+ private val messageContent1 = "test"
+ private val topic1 = "test-topic"
+ private val message1: Message = new Message(messageContent1.getBytes)
+
+ private val messageContent2 = "test1"
+ private val topic2 = "test1$topic"
+ private val message2: Message = new Message(messageContent2.getBytes)
+ val asyncProducerLogger = Logger.getLogger(classOf[AsyncProducer[String]])
+
+ @Test
+ def testProducerQueueSize() {
+ val basicProducer = EasyMock.createMock(classOf[SyncProducer])
+ basicProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic1, ProducerRequest.RandomPartition,
+ getMessageSetOfSize(List(message1), 10)))))
+ EasyMock.expectLastCall
+ basicProducer.close
+ EasyMock.expectLastCall
+ EasyMock.replay(basicProducer)
+
+ val props = new Properties()
+ props.put("host", "localhost")
+ props.put("port", "9092")
+ props.put("queue.size", "10")
+ props.put("serializer.class", "kafka.producer.StringSerializer")
+ val config = new AsyncProducerConfig(props)
+
+ val producer = new AsyncProducer[String](config, basicProducer, new StringSerializer)
+
+ //temporarily set log4j to a higher level to avoid error in the output
+ producer.setLoggerLevel(Level.FATAL)
+
+ try {
+ for(i <- 0 until 11) {
+ producer.send(messageContent1 + "-topic", messageContent1)
+ }
+ Assert.fail("Queue should be full")
+ }
+ catch {
+ case e: QueueFullException => println("Queue is full..")
+ }
+ producer.start
+ producer.close
+ Thread.sleep(2000)
+ EasyMock.verify(basicProducer)
+ producer.setLoggerLevel(Level.ERROR)
+ }
+
+ @Test
+ def testAddAfterQueueClosed() {
+ val basicProducer = EasyMock.createMock(classOf[SyncProducer])
+ basicProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic1, ProducerRequest.RandomPartition,
+ getMessageSetOfSize(List(message1), 10)))))
+ EasyMock.expectLastCall
+ basicProducer.close
+ EasyMock.expectLastCall
+ EasyMock.replay(basicProducer)
+
+ val props = new Properties()
+ props.put("host", "localhost")
+ props.put("port", "9092")
+ props.put("queue.size", "10")
+ props.put("serializer.class", "kafka.producer.StringSerializer")
+ val config = new AsyncProducerConfig(props)
+
+ val producer = new AsyncProducer[String](config, basicProducer, new StringSerializer)
+
+ producer.start
+ for(i <- 0 until 10) {
+ producer.send(messageContent1 + "-topic", messageContent1)
+ }
+ producer.close
+
+ try {
+ producer.send(messageContent1 + "-topic", messageContent1)
+ Assert.fail("Queue should be closed")
+ } catch {
+ case e: QueueClosedException =>
+ }
+ EasyMock.verify(basicProducer)
+ }
+
+ @Test
+ def testBatchSize() {
+ val basicProducer = EasyMock.createStrictMock(classOf[SyncProducer])
+ basicProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic1, ProducerRequest.RandomPartition,
+ getMessageSetOfSize(List(message1), 5)))))
+ EasyMock.expectLastCall.times(2)
+ basicProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic1, ProducerRequest.RandomPartition,
+ getMessageSetOfSize(List(message1), 1)))))
+ EasyMock.expectLastCall
+ basicProducer.close
+ EasyMock.expectLastCall
+ EasyMock.replay(basicProducer)
+
+ val props = new Properties()
+ props.put("host", "localhost")
+ props.put("port", "9092")
+ props.put("queue.size", "10")
+ props.put("serializer.class", "kafka.producer.StringSerializer")
+ props.put("batch.size", "5")
+
+ val config = new AsyncProducerConfig(props)
+
+ val producer = new AsyncProducer[String](config, basicProducer, new StringSerializer)
+
+ producer.start
+ for(i <- 0 until 10) {
+ producer.send(messageContent1 + "-topic", messageContent1)
+ }
+
+ Thread.sleep(100)
+ try {
+ producer.send(messageContent1 + "-topic", messageContent1)
+ } catch {
+ case e: QueueFullException =>
+ Assert.fail("Queue should not be full")
+ }
+
+ producer.close
+ EasyMock.verify(basicProducer)
+ }
+
+ @Test
+ def testQueueTimeExpired() {
+ val basicProducer = EasyMock.createMock(classOf[SyncProducer])
+ basicProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic1, ProducerRequest.RandomPartition,
+ getMessageSetOfSize(List(message1), 3)))))
+ EasyMock.expectLastCall
+ basicProducer.close
+ EasyMock.expectLastCall
+ EasyMock.replay(basicProducer)
+
+ val props = new Properties()
+ props.put("host", "localhost")
+ props.put("port", "9092")
+ props.put("queue.size", "10")
+ props.put("serializer.class", "kafka.producer.StringSerializer")
+ props.put("queue.time", "200")
+
+ val config = new AsyncProducerConfig(props)
+
+ val producer = new AsyncProducer[String](config, basicProducer, new StringSerializer)
+ val serializer = new StringSerializer
+
+ producer.start
+ for(i <- 0 until 3) {
+ producer.send(serializer.getTopic(messageContent1), messageContent1, ProducerRequest.RandomPartition)
+ }
+
+ Thread.sleep(300)
+ producer.close
+ EasyMock.verify(basicProducer)
+ }
+
+ @Test
+ def testSenderThreadShutdown() {
+ val syncProducerProps = new Properties()
+ syncProducerProps.put("host", "localhost")
+ syncProducerProps.put("port", "9092")
+ syncProducerProps.put("buffer.size", "1000")
+ syncProducerProps.put("connect.timeout.ms", "1000")
+ syncProducerProps.put("reconnect.interval", "1000")
+ val basicProducer = new MockProducer(new SyncProducerConfig(syncProducerProps))
+
+ val asyncProducerProps = new Properties()
+ asyncProducerProps.put("host", "localhost")
+ asyncProducerProps.put("port", "9092")
+ asyncProducerProps.put("queue.size", "10")
+ asyncProducerProps.put("serializer.class", "kafka.producer.StringSerializer")
+ asyncProducerProps.put("queue.time", "100")
+
+ val config = new AsyncProducerConfig(asyncProducerProps)
+ val producer = new AsyncProducer[String](config, basicProducer, new StringSerializer)
+ producer.start
+ producer.send(messageContent1 + "-topic", messageContent1)
+ producer.close
+ }
+
+ @Test
+ def testCollateEvents() {
+ val basicProducer = EasyMock.createMock(classOf[SyncProducer])
+ basicProducer.multiSend(EasyMock.aryEq(Array(new ProducerRequest(topic2, ProducerRequest.RandomPartition,
+ getMessageSetOfSize(List(message2), 5)),
+ new ProducerRequest(topic1, ProducerRequest.RandomPartition,
+ getMessageSetOfSize(List(message1), 5)))))
+ EasyMock.expectLastCall
+ basicProducer.close
+ EasyMock.expectLastCall
+ EasyMock.replay(basicProducer)
+
+ val props = new Properties()
+ props.put("host", "localhost")
+ props.put("port", "9092")
+ props.put("queue.size", "50")
+ props.put("serializer.class", "kafka.producer.StringSerializer")
+ props.put("batch.size", "10")
+
+ val config = new AsyncProducerConfig(props)
+
+ val producer = new AsyncProducer[String](config, basicProducer, new StringSerializer)
+
+ producer.start
+ val serializer = new StringSerializer
+ for(i <- 0 until 5) {
+ producer.send(messageContent1 + "-topic", messageContent1)
+ producer.send(messageContent2 + "$topic", messageContent2, ProducerRequest.RandomPartition)
+ }
+
+ producer.close
+ EasyMock.verify(basicProducer)
+
+ }
+
+ private def getMessageSetOfSize(messages: List[Message], counts: Int): ByteBufferMessageSet = {
+ var messageList = new Array[Message](counts)
+ for(message <- messages) {
+ for(i <- 0 until counts) {
+ messageList(i) = message
+ }
+ }
+ new ByteBufferMessageSet(NoCompressionCodec, messageList: _*)
+ }
+
+ class StringSerializer extends Encoder[String] {
+ def toMessage(event: String):Message = new Message(event.getBytes)
+ def getTopic(event: String): String = event.concat("-topic")
+ }
+
+ class MockProducer(override val config: SyncProducerConfig) extends SyncProducer(config) {
+ override def send(topic: String, messages: ByteBufferMessageSet): Unit = {
+ Thread.sleep(1000)
+ }
+ override def multiSend(produces: Array[ProducerRequest]) {
+ Thread.sleep(1000)
+ }
+ }
+}
Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerMethodsTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerMethodsTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerMethodsTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerMethodsTest.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2011 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package unit.kafka.producer
+
+import collection.immutable.SortedSet
+import java.util._
+import junit.framework.Assert._
+import kafka.cluster.Partition
+import kafka.common.NoBrokersForPartitionException
+import kafka.producer._
+import org.easymock.EasyMock
+import org.junit.Test
+import org.scalatest.junit.JUnitSuite
+import scala.collection.immutable.List
+
+class ProducerMethodsTest extends JUnitSuite {
+
+ @Test
+ def producerThrowsNoBrokersException() = {
+ val props = new Properties
+ props.put("broker.list", "placeholder") // Need to fake out having specified one
+ val config = new ProducerConfig(props)
+ val mockPartitioner = EasyMock.createMock(classOf[Partitioner[String]])
+ val mockProducerPool = EasyMock.createMock(classOf[ProducerPool[String]])
+ val mockBrokerPartitionInfo = EasyMock.createMock(classOf[kafka.producer.BrokerPartitionInfo])
+
+ EasyMock.expect(mockBrokerPartitionInfo.getBrokerPartitionInfo("the_topic")).andReturn(SortedSet[Partition]())
+ EasyMock.replay(mockBrokerPartitionInfo)
+
+ val producer = new Producer[String, String](config,mockPartitioner, mockProducerPool,false, mockBrokerPartitionInfo)
+
+ try {
+ val producerData = new ProducerData[String, String]("the_topic", "the_key", List("the_datum"))
+ producer.send(producerData)
+ fail("Should have thrown a NoBrokersForPartitionException.")
+ } catch {
+ case nb: NoBrokersForPartitionException => assertTrue(nb.getMessage.contains("the_key"))
+ }
+
+ }
+}
\ No newline at end of file
Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/ProducerTest.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,689 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.producer
+
+import async.{AsyncProducerConfig, AsyncProducer}
+import java.util.Properties
+import org.apache.log4j.{Logger, Level}
+import kafka.server.{KafkaRequestHandlers, KafkaServer, KafkaConfig}
+import kafka.zk.EmbeddedZookeeper
+import org.junit.{After, Before, Test}
+import junit.framework.Assert
+import collection.mutable.HashMap
+import org.easymock.EasyMock
+import java.util.concurrent.ConcurrentHashMap
+import kafka.cluster.Partition
+import org.scalatest.junit.JUnitSuite
+import kafka.common.{InvalidConfigException, UnavailableProducerException, InvalidPartitionException}
+import kafka.utils.{TestUtils, TestZKUtils, Utils}
+import kafka.serializer.{StringEncoder, Encoder}
+import kafka.consumer.SimpleConsumer
+import kafka.api.FetchRequest
+import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
+
+class ProducerTest extends JUnitSuite {
+ private val topic = "test-topic"
+ private val brokerId1 = 0
+ private val brokerId2 = 1
+ private val port1 = 9098
+ private val port2 = 9099
+ private var server1: KafkaServer = null
+ private var server2: KafkaServer = null
+ private var producer1: SyncProducer = null
+ private var producer2: SyncProducer = null
+ private var consumer1: SimpleConsumer = null
+ private var consumer2: SimpleConsumer = null
+ private var zkServer:EmbeddedZookeeper = null
+ private val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandlers])
+
+ @Before
+ def setUp() {
+ // set up 2 brokers with 4 partitions each
+ zkServer = new EmbeddedZookeeper(TestZKUtils.zookeeperConnect)
+
+ val props1 = TestUtils.createBrokerConfig(brokerId1, port1)
+ val config1 = new KafkaConfig(props1) {
+ override val numPartitions = 4
+ }
+ server1 = TestUtils.createServer(config1)
+
+ val props2 = TestUtils.createBrokerConfig(brokerId2, port2)
+ val config2 = new KafkaConfig(props2) {
+ override val numPartitions = 4
+ }
+ server2 = TestUtils.createServer(config2)
+
+ val props = new Properties()
+ props.put("host", "localhost")
+ props.put("port", port1.toString)
+
+ producer1 = new SyncProducer(new SyncProducerConfig(props))
+ producer1.send("test-topic", new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
+ messages = new Message("test".getBytes())))
+
+ producer2 = new SyncProducer(new SyncProducerConfig(props) {
+ override val port = port2
+ })
+ producer2.send("test-topic", new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
+ messages = new Message("test".getBytes())))
+
+ consumer1 = new SimpleConsumer("localhost", port1, 1000000, 64*1024)
+ consumer2 = new SimpleConsumer("localhost", port2, 100, 64*1024)
+
+ // temporarily set request handler logger to a higher level
+ requestHandlerLogger.setLevel(Level.FATAL)
+
+ Thread.sleep(500)
+ }
+
+ @After
+ def tearDown() {
+ // restore set request handler logger to a higher level
+ requestHandlerLogger.setLevel(Level.ERROR)
+ server1.shutdown
+ server2.shutdown
+ Utils.rm(server1.config.logDir)
+ Utils.rm(server2.config.logDir)
+ Thread.sleep(500)
+ zkServer.shutdown
+ Thread.sleep(500)
+ }
+
+ @Test
+ def testSend() {
+ val props = new Properties()
+ props.put("partitioner.class", "kafka.producer.StaticPartitioner")
+ props.put("serializer.class", "kafka.producer.StringSerializer")
+ props.put("zk.connect", TestZKUtils.zookeeperConnect)
+ val config = new ProducerConfig(props)
+ val partitioner = new StaticPartitioner
+ val serializer = new StringSerializer
+
+ // 2 sync producers
+ val syncProducers = new ConcurrentHashMap[Int, SyncProducer]()
+ val syncProducer1 = EasyMock.createMock(classOf[SyncProducer])
+ val syncProducer2 = EasyMock.createMock(classOf[SyncProducer])
+ // it should send to partition 0 (first partition) on second broker i.e broker2
+ syncProducer2.send(topic, 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message("test1".getBytes)))
+ EasyMock.expectLastCall
+ syncProducer1.close
+ EasyMock.expectLastCall
+ syncProducer2.close
+ EasyMock.expectLastCall
+ EasyMock.replay(syncProducer1)
+ EasyMock.replay(syncProducer2)
+
+ syncProducers.put(brokerId1, syncProducer1)
+ syncProducers.put(brokerId2, syncProducer2)
+
+ val producerPool = new ProducerPool(config, serializer, syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]())
+ val producer = new Producer[String, String](config, partitioner, producerPool, false, null)
+
+ producer.send(new ProducerData[String, String](topic, "test", Array("test1")))
+ producer.close
+
+ EasyMock.verify(syncProducer1)
+ EasyMock.verify(syncProducer2)
+ }
+
+ @Test
+ def testSendSingleMessage() {
+ val props = new Properties()
+ props.put("serializer.class", "kafka.serializer.StringEncoder")
+ props.put("broker.list", "0:localhost:9092")
+
+
+ val config = new ProducerConfig(props)
+ val partitioner = new StaticPartitioner
+ val serializer = new StringSerializer
+
+ // 2 sync producers
+ val syncProducers = new ConcurrentHashMap[Int, kafka.producer.SyncProducer]()
+ val syncProducer1 = EasyMock.createMock(classOf[kafka.producer.SyncProducer])
+ // it should send to random partition on broker 1
+ syncProducer1.send(topic, -1, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message("t".getBytes())))
+ EasyMock.expectLastCall
+ syncProducer1.close
+ EasyMock.expectLastCall
+ EasyMock.replay(syncProducer1)
+
+ syncProducers.put(brokerId1, syncProducer1)
+
+ val producerPool = new ProducerPool[String](config, serializer, syncProducers,
+ new ConcurrentHashMap[Int, AsyncProducer[String]]())
+ val producer = new Producer[String, String](config, partitioner, producerPool, false, null)
+
+ producer.send(new ProducerData[String, String](topic, "t"))
+ producer.close
+
+ EasyMock.verify(syncProducer1)
+ }
+
+ @Test
+ def testInvalidPartition() {
+ val props = new Properties()
+ props.put("partitioner.class", "kafka.producer.NegativePartitioner")
+ props.put("serializer.class", "kafka.producer.StringSerializer")
+ props.put("zk.connect", TestZKUtils.zookeeperConnect)
+ val config = new ProducerConfig(props)
+
+ val richProducer = new Producer[String, String](config)
+ try {
+ richProducer.send(new ProducerData[String, String](topic, "test", Array("test")))
+ Assert.fail("Should fail with InvalidPartitionException")
+ }catch {
+ case e: InvalidPartitionException => // expected, do nothing
+ }
+ }
+
+ @Test
+ def testDefaultEncoder() {
+ val props = new Properties()
+ props.put("zk.connect", TestZKUtils.zookeeperConnect)
+ val config = new ProducerConfig(props)
+
+ val stringProducer1 = new Producer[String, String](config)
+ try {
+ stringProducer1.send(new ProducerData[String, String](topic, "test", Array("test")))
+ fail("Should fail with ClassCastException due to incompatible Encoder")
+ } catch {
+ case e: ClassCastException =>
+ }
+
+ props.put("serializer.class", "kafka.serializer.StringEncoder")
+ val stringProducer2 = new Producer[String, String](new ProducerConfig(props))
+ stringProducer2.send(new ProducerData[String, String](topic, "test", Array("test")))
+
+ val messageProducer1 = new Producer[String, Message](config)
+ try {
+ messageProducer1.send(new ProducerData[String, Message](topic, "test", Array(new Message("test".getBytes))))
+ } catch {
+ case e: ClassCastException => fail("Should not fail with ClassCastException due to default Encoder")
+ }
+ }
+
+ @Test
+ def testSyncProducerPool() {
+ // 2 sync producers
+ val syncProducers = new ConcurrentHashMap[Int, SyncProducer]()
+ val syncProducer1 = EasyMock.createMock(classOf[SyncProducer])
+ val syncProducer2 = EasyMock.createMock(classOf[SyncProducer])
+ syncProducer1.send("test-topic", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message("test1".getBytes)))
+ EasyMock.expectLastCall
+ syncProducer1.close
+ EasyMock.expectLastCall
+ syncProducer2.close
+ EasyMock.expectLastCall
+ EasyMock.replay(syncProducer1)
+ EasyMock.replay(syncProducer2)
+
+ syncProducers.put(brokerId1, syncProducer1)
+ syncProducers.put(brokerId2, syncProducer2)
+
+ // default for producer.type is "sync"
+ val props = new Properties()
+ props.put("partitioner.class", "kafka.producer.NegativePartitioner")
+ props.put("serializer.class", "kafka.producer.StringSerializer")
+ val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer,
+ syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]())
+ producerPool.send(producerPool.getProducerPoolData("test-topic", new Partition(brokerId1, 0), Array("test1")))
+
+ producerPool.close
+ EasyMock.verify(syncProducer1)
+ EasyMock.verify(syncProducer2)
+ }
+
+ @Test
+ def testAsyncProducerPool() {
+ // 2 async producers
+ val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]()
+ val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]])
+ val asyncProducer2 = EasyMock.createMock(classOf[AsyncProducer[String]])
+ asyncProducer1.send(topic, "test1", 0)
+ EasyMock.expectLastCall
+ asyncProducer1.close
+ EasyMock.expectLastCall
+ asyncProducer2.close
+ EasyMock.expectLastCall
+ EasyMock.replay(asyncProducer1)
+ EasyMock.replay(asyncProducer2)
+
+ asyncProducers.put(brokerId1, asyncProducer1)
+ asyncProducers.put(brokerId2, asyncProducer2)
+
+ // change producer.type to "async"
+ val props = new Properties()
+ props.put("partitioner.class", "kafka.producer.NegativePartitioner")
+ props.put("serializer.class", "kafka.producer.StringSerializer")
+ props.put("producer.type", "async")
+ val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer,
+ new ConcurrentHashMap[Int, SyncProducer](), asyncProducers)
+ producerPool.send(producerPool.getProducerPoolData(topic, new Partition(brokerId1, 0), Array("test1")))
+
+ producerPool.close
+ EasyMock.verify(asyncProducer1)
+ EasyMock.verify(asyncProducer2)
+ }
+
+ @Test
+ def testSyncUnavailableProducerException() {
+ val syncProducers = new ConcurrentHashMap[Int, SyncProducer]()
+ val syncProducer1 = EasyMock.createMock(classOf[SyncProducer])
+ val syncProducer2 = EasyMock.createMock(classOf[SyncProducer])
+ syncProducer2.close
+ EasyMock.expectLastCall
+ EasyMock.replay(syncProducer1)
+ EasyMock.replay(syncProducer2)
+
+ syncProducers.put(brokerId2, syncProducer2)
+
+ // default for producer.type is "sync"
+ val props = new Properties()
+ props.put("partitioner.class", "kafka.producer.NegativePartitioner")
+ props.put("serializer.class", "kafka.producer.StringSerializer")
+ val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer,
+ syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]())
+ try {
+ producerPool.send(producerPool.getProducerPoolData("test-topic", new Partition(brokerId1, 0), Array("test1")))
+ Assert.fail("Should fail with UnavailableProducerException")
+ }catch {
+ case e: UnavailableProducerException => // expected
+ }
+
+ producerPool.close
+ EasyMock.verify(syncProducer1)
+ EasyMock.verify(syncProducer2)
+ }
+
+ @Test
+ def testAsyncUnavailableProducerException() {
+ val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]()
+ val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]])
+ val asyncProducer2 = EasyMock.createMock(classOf[AsyncProducer[String]])
+ asyncProducer2.close
+ EasyMock.expectLastCall
+ EasyMock.replay(asyncProducer1)
+ EasyMock.replay(asyncProducer2)
+
+ asyncProducers.put(brokerId2, asyncProducer2)
+
+ // change producer.type to "async"
+ val props = new Properties()
+ props.put("partitioner.class", "kafka.producer.NegativePartitioner")
+ props.put("serializer.class", "kafka.producer.StringSerializer")
+ props.put("producer.type", "async")
+ val producerPool = new ProducerPool[String](new ProducerConfig(props), new StringSerializer,
+ new ConcurrentHashMap[Int, SyncProducer](), asyncProducers)
+ try {
+ producerPool.send(producerPool.getProducerPoolData(topic, new Partition(brokerId1, 0), Array("test1")))
+ Assert.fail("Should fail with UnavailableProducerException")
+ }catch {
+ case e: UnavailableProducerException => // expected
+ }
+
+ producerPool.close
+ EasyMock.verify(asyncProducer1)
+ EasyMock.verify(asyncProducer2)
+ }
+
+ @Test
+ def testConfigBrokerPartitionInfoWithPartitioner {
+ val props = new Properties()
+ props.put("partitioner.class", "kafka.producer.StaticPartitioner")
+ props.put("serializer.class", "kafka.producer.StringSerializer")
+ props.put("producer.type", "async")
+ props.put("broker.list", brokerId1 + ":" + "localhost" + ":" + port1 + ":" + 4 + "," +
+ brokerId2 + ":" + "localhost" + ":" + port2 + ":" + 4)
+
+ var config: ProducerConfig = null
+ try {
+ config = new ProducerConfig(props)
+ fail("should fail with InvalidConfigException due to presence of partitioner.class and broker.list")
+ }catch {
+ case e: InvalidConfigException => // expected
+ }
+ }
+
+ @Test
+ def testConfigBrokerPartitionInfo() {
+ val props = new Properties()
+ props.put("serializer.class", "kafka.producer.StringSerializer")
+ props.put("producer.type", "async")
+ props.put("broker.list", brokerId1 + ":" + "localhost" + ":" + port1)
+
+ val config = new ProducerConfig(props)
+ val partitioner = new StaticPartitioner
+ val serializer = new StringSerializer
+
+ // 2 async producers
+ val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]()
+ val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]])
+ // it should send to partition 0 (first partition) on second broker i.e broker2
+ asyncProducer1.send(topic, "test1", -1)
+ EasyMock.expectLastCall
+ asyncProducer1.close
+ EasyMock.expectLastCall
+ EasyMock.replay(asyncProducer1)
+
+ asyncProducers.put(brokerId1, asyncProducer1)
+
+ val producerPool = new ProducerPool(config, serializer, new ConcurrentHashMap[Int, SyncProducer](), asyncProducers)
+ val producer = new Producer[String, String](config, partitioner, producerPool, false, null)
+
+ producer.send(new ProducerData[String, String](topic, "test1", Array("test1")))
+ producer.close
+
+ EasyMock.verify(asyncProducer1)
+ }
+
+ @Test
+ def testZKSendToNewTopic() {
+ val props = new Properties()
+ props.put("serializer.class", "kafka.serializer.StringEncoder")
+ props.put("partitioner.class", "kafka.producer.StaticPartitioner")
+ props.put("zk.connect", TestZKUtils.zookeeperConnect)
+
+ val config = new ProducerConfig(props)
+ val serializer = new StringEncoder
+
+ val producer = new Producer[String, String](config)
+ try {
+ // Available broker id, partition id at this stage should be (0,0), (1,0)
+ // this should send the message to broker 0 on partition 0
+ producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
+ Thread.sleep(100)
+ // Available broker id, partition id at this stage should be (0,0), (0,1), (0,2), (0,3), (1,0)
+ // Since 4 % 5 = 4, this should send the message to broker 1 on partition 0
+ producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
+ Thread.sleep(100)
+ // cross check if brokers got the messages
+ val messageSet1 = consumer1.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator
+ Assert.assertTrue("Message set should have 1 message", messageSet1.hasNext)
+ Assert.assertEquals(new Message("test1".getBytes), messageSet1.next.message)
+ val messageSet2 = consumer2.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator
+ Assert.assertTrue("Message set should have 1 message", messageSet2.hasNext)
+ Assert.assertEquals(new Message("test1".getBytes), messageSet2.next.message)
+ } catch {
+ case e: Exception => fail("Not expected", e)
+ }
+ producer.close
+ }
+
+ @Test
+ def testZKSendWithDeadBroker() {
+ val props = new Properties()
+ props.put("serializer.class", "kafka.serializer.StringEncoder")
+ props.put("partitioner.class", "kafka.producer.StaticPartitioner")
+ props.put("zk.connect", TestZKUtils.zookeeperConnect)
+
+ val config = new ProducerConfig(props)
+ val serializer = new StringEncoder
+
+ val producer = new Producer[String, String](config)
+ try {
+ // Available broker id, partition id at this stage should be (0,0), (1,0)
+ // Hence, this should send the message to broker 0 on partition 0
+ producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
+ Thread.sleep(100)
+ // kill 2nd broker
+ server2.shutdown
+ Thread.sleep(100)
+ // Available broker id, partition id at this stage should be (0,0), (0,1), (0,2), (0,3), (1,0)
+ // Since 4 % 5 = 4, in a normal case, it would send to broker 1 on partition 0. But since broker 1 is down,
+ // 4 % 4 = 0, So it should send the message to broker 0 on partition 0
+ producer.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
+ Thread.sleep(100)
+ // cross check if brokers got the messages
+ val messageSet1 = consumer1.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator
+ Assert.assertTrue("Message set should have 1 message", messageSet1.hasNext)
+ Assert.assertEquals(new Message("test1".getBytes), messageSet1.next.message)
+ Assert.assertTrue("Message set should have another message", messageSet1.hasNext)
+ Assert.assertEquals(new Message("test1".getBytes), messageSet1.next.message)
+ } catch {
+ case e: Exception => fail("Not expected")
+ }
+ producer.close
+ }
+
+ @Test
+ def testZKSendToExistingTopicWithNoBrokers() {
+ val props = new Properties()
+ props.put("serializer.class", "kafka.serializer.StringEncoder")
+ props.put("partitioner.class", "kafka.producer.StaticPartitioner")
+ props.put("zk.connect", TestZKUtils.zookeeperConnect)
+
+ val config = new ProducerConfig(props)
+ val serializer = new StringEncoder
+
+ val producer = new Producer[String, String](config)
+ var server: KafkaServer = null
+
+ try {
+ // shutdown server1
+ server1.shutdown
+ Thread.sleep(100)
+ // Available broker id, partition id at this stage should be (1,0)
+ // this should send the message to broker 1 on partition 0
+ producer.send(new ProducerData[String, String]("new-topic", "test", Array("test")))
+ Thread.sleep(100)
+ // cross check if brokers got the messages
+ val messageSet1 = consumer2.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator
+ Assert.assertTrue("Message set should have 1 message", messageSet1.hasNext)
+ Assert.assertEquals(new Message("test".getBytes), messageSet1.next.message)
+
+ // shutdown server2
+ server2.shutdown
+ Thread.sleep(100)
+ // delete the new-topic logs
+ Utils.rm(server2.config.logDir)
+ Thread.sleep(100)
+ // start it up again. So broker 2 exists under /broker/ids, but nothing exists under /broker/topics/new-topic
+ val props2 = TestUtils.createBrokerConfig(brokerId1, port1)
+ val config2 = new KafkaConfig(props2) {
+ override val numPartitions = 4
+ }
+ server = TestUtils.createServer(config2)
+ Thread.sleep(100)
+
+ // now there are no brokers registered under test-topic.
+ producer.send(new ProducerData[String, String]("new-topic", "test", Array("test")))
+ Thread.sleep(100)
+
+ // cross check if brokers got the messages
+ val messageSet2 = consumer1.fetch(new FetchRequest("new-topic", 0, 0, 10000)).iterator
+ Assert.assertTrue("Message set should have 1 message", messageSet2.hasNext)
+ Assert.assertEquals(new Message("test".getBytes), messageSet2.next.message)
+
+ } catch {
+ case e: Exception => fail("Not expected", e)
+ }finally {
+ server.shutdown
+ producer.close
+ }
+ }
+
+ @Test
+ def testPartitionedSendToNewTopic() {
+ val props = new Properties()
+ props.put("partitioner.class", "kafka.producer.StaticPartitioner")
+ props.put("serializer.class", "kafka.producer.StringSerializer")
+ props.put("zk.connect", TestZKUtils.zookeeperConnect)
+
+ val config = new ProducerConfig(props)
+ val partitioner = new StaticPartitioner
+ val serializer = new StringSerializer
+
+ // 2 sync producers
+ val syncProducers = new ConcurrentHashMap[Int, SyncProducer]()
+ val syncProducer1 = EasyMock.createMock(classOf[SyncProducer])
+ val syncProducer2 = EasyMock.createMock(classOf[SyncProducer])
+ syncProducer1.send("test-topic1", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
+ messages = new Message("test1".getBytes)))
+ EasyMock.expectLastCall
+ syncProducer1.send("test-topic1", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
+ messages = new Message("test1".getBytes)))
+ EasyMock.expectLastCall
+ syncProducer1.close
+ EasyMock.expectLastCall
+ syncProducer2.close
+ EasyMock.expectLastCall
+ EasyMock.replay(syncProducer1)
+ EasyMock.replay(syncProducer2)
+
+ syncProducers.put(brokerId1, syncProducer1)
+ syncProducers.put(brokerId2, syncProducer2)
+
+ val producerPool = new ProducerPool(config, serializer, syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]())
+ val producer = new Producer[String, String](config, partitioner, producerPool, false, null)
+
+ producer.send(new ProducerData[String, String]("test-topic1", "test", Array("test1")))
+ Thread.sleep(100)
+
+ // now send again to this topic using a real producer, this time all brokers would have registered
+ // their partitions in zookeeper
+ producer1.send("test-topic1", new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
+ messages = new Message("test".getBytes())))
+ Thread.sleep(100)
+
+ // wait for zookeeper to register the new topic
+ producer.send(new ProducerData[String, String]("test-topic1", "test1", Array("test1")))
+ Thread.sleep(100)
+ producer.close
+
+ EasyMock.verify(syncProducer1)
+ EasyMock.verify(syncProducer2)
+ }
+
+ @Test
+ def testPartitionedSendToNewBrokerInExistingTopic() {
+ val props = new Properties()
+ props.put("partitioner.class", "kafka.producer.StaticPartitioner")
+ props.put("serializer.class", "kafka.producer.StringSerializer")
+ props.put("zk.connect", TestZKUtils.zookeeperConnect)
+
+ val config = new ProducerConfig(props)
+ val partitioner = new StaticPartitioner
+ val serializer = new StringSerializer
+
+ // 2 sync producers
+ val syncProducers = new ConcurrentHashMap[Int, SyncProducer]()
+ val syncProducer1 = EasyMock.createMock(classOf[SyncProducer])
+ val syncProducer2 = EasyMock.createMock(classOf[SyncProducer])
+ val syncProducer3 = EasyMock.createMock(classOf[SyncProducer])
+ syncProducer3.send("test-topic", 2, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
+ messages = new Message("test1".getBytes)))
+ EasyMock.expectLastCall
+ syncProducer1.close
+ EasyMock.expectLastCall
+ syncProducer2.close
+ EasyMock.expectLastCall
+ syncProducer3.close
+ EasyMock.expectLastCall
+ EasyMock.replay(syncProducer1)
+ EasyMock.replay(syncProducer2)
+ EasyMock.replay(syncProducer3)
+
+ syncProducers.put(brokerId1, syncProducer1)
+ syncProducers.put(brokerId2, syncProducer2)
+ syncProducers.put(2, syncProducer3)
+
+ val producerPool = new ProducerPool(config, serializer, syncProducers, new ConcurrentHashMap[Int, AsyncProducer[String]]())
+ val producer = new Producer[String, String](config, partitioner, producerPool, false, null)
+
+ val serverProps = TestUtils.createBrokerConfig(2, 9094)
+ val serverConfig = new KafkaConfig(serverProps) {
+ override val numPartitions = 4
+ }
+ val server3 = TestUtils.createServer(serverConfig)
+ Thread.sleep(500)
+
+ // send a message to the new broker to register it under topic "test-topic"
+ val tempProps = new Properties()
+ tempProps.put("host", "localhost")
+ tempProps.put("port", "9094")
+ val tempProducer = new SyncProducer(new SyncProducerConfig(tempProps))
+ tempProducer.send("test-topic", new ByteBufferMessageSet(compressionCodec = NoCompressionCodec,
+ messages = new Message("test".getBytes())))
+
+ Thread.sleep(500)
+
+ producer.send(new ProducerData[String, String]("test-topic", "test-topic", Array("test1")))
+ producer.close
+
+ EasyMock.verify(syncProducer1)
+ EasyMock.verify(syncProducer2)
+ EasyMock.verify(syncProducer3)
+
+ server3.shutdown
+ Utils.rm(server3.config.logDir)
+ }
+
+ @Test
+ def testDefaultPartitioner() {
+ val props = new Properties()
+ props.put("serializer.class", "kafka.producer.StringSerializer")
+ props.put("producer.type", "async")
+ props.put("broker.list", brokerId1 + ":" + "localhost" + ":" + port1)
+ val config = new ProducerConfig(props)
+ val partitioner = new DefaultPartitioner[String]
+ val serializer = new StringSerializer
+
+ // 2 async producers
+ val asyncProducers = new ConcurrentHashMap[Int, AsyncProducer[String]]()
+ val asyncProducer1 = EasyMock.createMock(classOf[AsyncProducer[String]])
+ // it should send to partition 0 (first partition) on second broker i.e broker2
+ asyncProducer1.send(topic, "test1", -1)
+ EasyMock.expectLastCall
+ asyncProducer1.close
+ EasyMock.expectLastCall
+ EasyMock.replay(asyncProducer1)
+
+ asyncProducers.put(brokerId1, asyncProducer1)
+
+ val producerPool = new ProducerPool(config, serializer, new ConcurrentHashMap[Int, SyncProducer](), asyncProducers)
+ val producer = new Producer[String, String](config, partitioner, producerPool, false, null)
+
+ producer.send(new ProducerData[String, String](topic, "test", Array("test1")))
+ producer.close
+
+ EasyMock.verify(asyncProducer1)
+ }
+}
+
+class StringSerializer extends Encoder[String] {
+ def toEvent(message: Message):String = message.toString
+ def toMessage(event: String):Message = new Message(event.getBytes)
+ def getTopic(event: String): String = event.concat("-topic")
+}
+
+class NegativePartitioner extends Partitioner[String] {
+ def partition(data: String, numPartitions: Int): Int = {
+ -1
+ }
+}
+
+class StaticPartitioner extends Partitioner[String] {
+ def partition(data: String, numPartitions: Int): Int = {
+ (data.length % numPartitions)
+ }
+}
+
+class HashPartitioner extends Partitioner[String] {
+ def partition(data: String, numPartitions: Int): Int = {
+ (data.hashCode % numPartitions)
+ }
+}
Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,178 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.producer
+
+import junit.framework.{Assert, TestCase}
+import kafka.utils.SystemTime
+import kafka.utils.TestUtils
+import kafka.server.{KafkaServer, KafkaConfig}
+import org.apache.log4j.{Logger, Level}
+import org.scalatest.junit.JUnitSuite
+import org.junit.{After, Before, Test}
+import kafka.common.MessageSizeTooLargeException
+import java.util.Properties
+import kafka.api.ProducerRequest
+import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
+
+class SyncProducerTest extends JUnitSuite {
+ private var messageBytes = new Array[Byte](2);
+ private var server: KafkaServer = null
+ val simpleProducerLogger = Logger.getLogger(classOf[SyncProducer])
+
+ @Before
+ def setUp() {
+ server = TestUtils.createServer(new KafkaConfig(TestUtils.createBrokerConfig(0, 9092))
+ {
+ override val enableZookeeper = false
+ })
+ }
+
+ @After
+ def tearDown() {
+ server.shutdown
+ }
+
+ @Test
+ def testUnreachableServer() {
+ val props = new Properties()
+ props.put("host", "NOT_USED")
+ props.put("port", "9092")
+ props.put("buffer.size", "102400")
+ props.put("connect.timeout.ms", "300")
+ props.put("reconnect.interval", "1000")
+ val producer = new SyncProducer(new SyncProducerConfig(props))
+ var failed = false
+ val firstStart = SystemTime.milliseconds
+
+ //temporarily increase log4j level to avoid error in output
+ simpleProducerLogger.setLevel(Level.FATAL)
+ try {
+ producer.send("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)))
+ }catch {
+ case e: Exception => failed = true
+ }
+ Assert.assertTrue(failed)
+ failed = false
+ val firstEnd = SystemTime.milliseconds
+ println("First message send retries took " + (firstEnd-firstStart) + " ms")
+ Assert.assertTrue((firstEnd-firstStart) < 300)
+
+ val secondStart = SystemTime.milliseconds
+ try {
+ producer.send("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)))
+ }catch {
+ case e: Exception => failed = true
+
+ }
+ val secondEnd = SystemTime.milliseconds
+ println("Second message send retries took " + (secondEnd-secondStart) + " ms")
+ Assert.assertTrue((secondEnd-secondEnd) < 300)
+ simpleProducerLogger.setLevel(Level.ERROR)
+ }
+
+ @Test
+ def testReachableServer() {
+ val props = new Properties()
+ props.put("host", "localhost")
+ props.put("port", "9092")
+ props.put("buffer.size", "102400")
+ props.put("connect.timeout.ms", "500")
+ props.put("reconnect.interval", "1000")
+ val producer = new SyncProducer(new SyncProducerConfig(props))
+ var failed = false
+ val firstStart = SystemTime.milliseconds
+ try {
+ producer.send("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)))
+ }catch {
+ case e: Exception => failed=true
+ }
+ Assert.assertFalse(failed)
+ failed = false
+ val firstEnd = SystemTime.milliseconds
+ Assert.assertTrue((firstEnd-firstStart) < 500)
+ val secondStart = SystemTime.milliseconds
+ try {
+ producer.send("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)))
+ }catch {
+ case e: Exception => failed = true
+ }
+ Assert.assertFalse(failed)
+ val secondEnd = SystemTime.milliseconds
+ Assert.assertTrue((secondEnd-secondEnd) < 500)
+
+ try {
+ producer.multiSend(Array(new ProducerRequest("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)))))
+ }catch {
+ case e: Exception => failed=true
+ }
+ Assert.assertFalse(failed)
+ }
+
+ @Test
+ def testReachableServerWrongPort() {
+ val props = new Properties()
+ props.put("host", "localhost")
+ props.put("port", "9091")
+ props.put("buffer.size", "102400")
+ props.put("connect.timeout.ms", "300")
+ props.put("reconnect.interval", "500")
+ val producer = new SyncProducer(new SyncProducerConfig(props))
+ var failed = false
+ val firstStart = SystemTime.milliseconds
+ //temporarily increase log4j level to avoid error in output
+ simpleProducerLogger.setLevel(Level.FATAL)
+ try {
+ producer.send("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)))
+ }catch {
+ case e: Exception => failed = true
+ }
+ Assert.assertTrue(failed)
+ failed = false
+ val firstEnd = SystemTime.milliseconds
+ Assert.assertTrue((firstEnd-firstStart) < 300)
+ val secondStart = SystemTime.milliseconds
+ try {
+ producer.send("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)))
+ }catch {
+ case e: Exception => failed = true
+ }
+ Assert.assertTrue(failed)
+ val secondEnd = SystemTime.milliseconds
+ Assert.assertTrue((secondEnd-secondEnd) < 300)
+ simpleProducerLogger.setLevel(Level.ERROR)
+ }
+
+ @Test
+ def testMessageSizeTooLarge() {
+ val props = new Properties()
+ props.put("host", "localhost")
+ props.put("port", "9091")
+ props.put("buffer.size", "102400")
+ props.put("connect.timeout.ms", "300")
+ props.put("reconnect.interval", "500")
+ props.put("max.message.size", "100")
+ val producer = new SyncProducer(new SyncProducerConfig(props))
+ val bytes = new Array[Byte](101)
+ var failed = false
+ try {
+ producer.send("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(bytes)))
+ }catch {
+ case e: MessageSizeTooLargeException => failed = true
+ }
+ Assert.assertTrue(failed)
+ }
+}
Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,121 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.utils.TestUtils
+import java.io.File
+import kafka.utils.Utils
+import kafka.api.FetchRequest
+import kafka.integration.ProducerConsumerTestHarness
+import kafka.producer.{SyncProducer, SyncProducerConfig}
+import kafka.consumer.SimpleConsumer
+import java.util.Properties
+import org.scalatest.junit.JUnitSuite
+import junit.framework.{Assert, TestCase}
+import org.junit.{After, Before, Test}
+import junit.framework.Assert._
+import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
+
+class ServerShutdownTest extends JUnitSuite {
+ val port = 9999
+
+ @Test
+ def testCleanShutdown() {
+ val props = TestUtils.createBrokerConfig(0, port)
+ val config = new KafkaConfig(props) {
+ override val enableZookeeper = false
+ }
+
+ val host = "localhost"
+ val topic = "test"
+ val sent1 = new ByteBufferMessageSet(NoCompressionCodec, new Message("hello".getBytes()), new Message("there".getBytes()))
+ val sent2 = new ByteBufferMessageSet(NoCompressionCodec, new Message("more".getBytes()), new Message("messages".getBytes()))
+
+ {
+ val producer = new SyncProducer(getProducerConfig(host,
+ port,
+ 64*1024,
+ 100000,
+ 10000))
+ val consumer = new SimpleConsumer(host,
+ port,
+ 1000000,
+ 64*1024)
+
+ val server = new KafkaServer(config)
+ server.startup()
+
+ // send some messages
+ producer.send(topic, sent1)
+ sent1.getBuffer.rewind
+
+ Thread.sleep(200)
+ // do a clean shutdown
+ server.shutdown()
+ val cleanShutDownFile = new File(new File(config.logDir), server.CLEAN_SHUTDOWN_FILE)
+ assertTrue(cleanShutDownFile.exists)
+ }
+
+
+ {
+ val producer = new SyncProducer(getProducerConfig(host,
+ port,
+ 64*1024,
+ 100000,
+ 10000))
+ val consumer = new SimpleConsumer(host,
+ port,
+ 1000000,
+ 64*1024)
+
+ val server = new KafkaServer(config)
+ server.startup()
+
+ // bring the server back again and read the messages
+ var fetched: ByteBufferMessageSet = null
+ while(fetched == null || fetched.validBytes == 0)
+ fetched = consumer.fetch(new FetchRequest(topic, 0, 0, 10000))
+ TestUtils.checkEquals(sent1.iterator, fetched.iterator)
+ val newOffset = fetched.validBytes
+
+ // send some more messages
+ producer.send(topic, sent2)
+ sent2.getBuffer.rewind
+
+ Thread.sleep(200)
+
+ fetched = null
+ while(fetched == null || fetched.validBytes == 0)
+ fetched = consumer.fetch(new FetchRequest(topic, 0, newOffset, 10000))
+ TestUtils.checkEquals(sent2.map(m => m.message).iterator, fetched.map(m => m.message).iterator)
+
+ server.shutdown()
+ Utils.rm(server.config.logDir)
+ }
+
+ }
+
+ private def getProducerConfig(host: String, port: Int, bufferSize: Int, connectTimeout: Int,
+ reconnectInterval: Int): SyncProducerConfig = {
+ val props = new Properties()
+ props.put("host", host)
+ props.put("port", port.toString)
+ props.put("buffer.size", bufferSize.toString)
+ props.put("connect.timeout.ms", connectTimeout.toString)
+ props.put("reconnect.interval", reconnectInterval.toString)
+ new SyncProducerConfig(props)
+ }
+}
Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,294 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import java.io._
+import java.net._
+import java.nio._
+import java.nio.channels._
+import java.util.Random
+import java.util.Properties
+import junit.framework.Assert._
+import kafka.server._
+import kafka.producer._
+import kafka.message._
+import org.I0Itec.zkclient.ZkClient
+import kafka.consumer.ConsumerConfig
+
+/**
+ * Utility functions to help with testing
+ */
+object TestUtils {
+
+ val Letters = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
+ val Digits = "0123456789"
+ val LettersAndDigits = Letters + Digits
+
+ /* A consistent random number generator to make tests repeatable */
+ val seededRandom = new Random(192348092834L)
+ val random = new Random()
+
+ /**
+ * Choose a number of random available ports
+ */
+ def choosePorts(count: Int): List[Int] = {
+ val sockets =
+ for(i <- 0 until count)
+ yield new ServerSocket(0)
+ val socketList = sockets.toList
+ val ports = socketList.map(_.getLocalPort)
+ socketList.map(_.close)
+ ports
+ }
+
+ /**
+ * Choose an available port
+ */
+ def choosePort(): Int = choosePorts(1).head
+
+ /**
+ * Create a temporary directory
+ */
+ def tempDir(): File = {
+ val ioDir = System.getProperty("java.io.tmpdir")
+ val f = new File(ioDir, "kafka-" + random.nextInt(1000000))
+ f.mkdirs()
+ f.deleteOnExit()
+ f
+ }
+
+ /**
+ * Create a temporary file
+ */
+ def tempFile(): File = {
+ val f = File.createTempFile("kafka", ".tmp")
+ f.deleteOnExit()
+ f
+ }
+
+ /**
+ * Create a temporary file and return an open file channel for this file
+ */
+ def tempChannel(): FileChannel = new RandomAccessFile(tempFile(), "rw").getChannel()
+
+ /**
+ * Create a kafka server instance with appropriate test settings
+ * @param config The configuration of the server
+ */
+ def createServer(config: KafkaConfig): KafkaServer = {
+ val server = new KafkaServer(config)
+ server.startup()
+ server
+ }
+
+ /**
+ * Create a test config for the given node id
+ */
+ def createBrokerConfigs(numConfigs: Int): List[Properties] = {
+ for((port, node) <- choosePorts(numConfigs).zipWithIndex)
+ yield createBrokerConfig(node, port)
+ }
+
+ /**
+ * Create a test config for the given node id
+ */
+ def createBrokerConfig(nodeId: Int, port: Int): Properties = {
+ val props = new Properties
+ props.put("brokerid", nodeId.toString)
+ props.put("port", port.toString)
+ props.put("log.dir", TestUtils.tempDir().getAbsolutePath)
+ props.put("log.flush.interval", "1")
+ props.put("zk.connect", TestZKUtils.zookeeperConnect)
+ props
+ }
+
+ /**
+ * Create a test config for a consumer
+ */
+ def createConsumerProperties(zkConnect: String, groupId: String, consumerId: String,
+ consumerTimeout: Long = -1): Properties = {
+ val props = new Properties
+ props.put("zk.connect", zkConnect)
+ props.put("groupid", groupId)
+ props.put("consumerid", consumerId)
+ props.put("consumer.timeout.ms", consumerTimeout.toString)
+ props.put("zk.sessiontimeout.ms", "400")
+ props.put("zk.synctime.ms", "200")
+ props.put("autocommit.interval.ms", "1000")
+
+ props
+ }
+
+ /**
+ * Wrap the message in a message set
+ * @param payload The bytes of the message
+ */
+ def singleMessageSet(payload: Array[Byte]) =
+ new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(payload))
+
+ /**
+ * Generate an array of random bytes
+ * @param numBytes The size of the array
+ */
+ def randomBytes(numBytes: Int): Array[Byte] = {
+ val bytes = new Array[Byte](numBytes)
+ seededRandom.nextBytes(bytes)
+ bytes
+ }
+
+ /**
+ * Generate a random string of letters and digits of the given length
+ * @param len The length of the string
+ * @return The random string
+ */
+ def randomString(len: Int): String = {
+ val b = new StringBuilder()
+ for(i <- 0 until len)
+ b.append(LettersAndDigits.charAt(seededRandom.nextInt(LettersAndDigits.length)))
+ b.toString
+ }
+
+ /**
+ * Check that the buffer content from buffer.position() to buffer.limit() is equal
+ */
+ def checkEquals(b1: ByteBuffer, b2: ByteBuffer) {
+ assertEquals("Buffers should have equal length", b1.limit - b1.position, b2.limit - b2.position)
+ for(i <- 0 until b1.limit - b1.position)
+ assertEquals("byte " + i + " byte not equal.", b1.get(b1.position + i), b2.get(b1.position + i))
+ }
+
+ /**
+ * Throw an exception if the two iterators are of differing lengths or contain
+ * different messages on their Nth element
+ */
+ def checkEquals[T](expected: Iterator[T], actual: Iterator[T]) {
+ var length = 0
+ while(expected.hasNext && actual.hasNext) {
+ length += 1
+ assertEquals(expected.next, actual.next)
+ }
+
+ if (expected.hasNext)
+ {
+ var length1 = length;
+ while (expected.hasNext)
+ {
+ expected.next
+ length1 += 1
+ }
+ assertFalse("Iterators have uneven length-- first has more: "+length1 + " > " + length, true);
+ }
+
+ if (actual.hasNext)
+ {
+ var length2 = length;
+ while (actual.hasNext)
+ {
+ actual.next
+ length2 += 1
+ }
+ assertFalse("Iterators have uneven length-- second has more: "+length2 + " > " + length, true);
+ }
+ }
+
+ /**
+ * Throw an exception if an iterable has different length than expected
+ *
+ */
+ def checkLength[T](s1: Iterator[T], expectedLength:Integer) {
+ var n = 0
+ while (s1.hasNext) {
+ n+=1
+ s1.next
+ }
+ assertEquals(expectedLength, n)
+ }
+
+ /**
+ * Throw an exception if the two iterators are of differing lengths or contain
+ * different messages on their Nth element
+ */
+ def checkEquals[T](s1: java.util.Iterator[T], s2: java.util.Iterator[T]) {
+ while(s1.hasNext && s2.hasNext)
+ assertEquals(s1.next, s2.next)
+ assertFalse("Iterators have uneven length--first has more", s1.hasNext)
+ assertFalse("Iterators have uneven length--second has more", s2.hasNext)
+ }
+
+ def stackedIterator[T](s: Iterator[T]*): Iterator[T] = {
+ new Iterator[T] {
+ var cur: Iterator[T] = null
+ val topIterator = s.iterator
+
+ def hasNext() : Boolean = {
+ while (true) {
+ if (cur == null) {
+ if (topIterator.hasNext)
+ cur = topIterator.next
+ else
+ return false
+ }
+ if (cur.hasNext)
+ return true
+ cur = null
+ }
+ // should never reach her
+ throw new RuntimeException("should not reach here")
+ }
+
+ def next() : T = cur.next
+ }
+ }
+
+ /**
+ * Create a hexidecimal string for the given bytes
+ */
+ def hexString(bytes: Array[Byte]): String = hexString(ByteBuffer.wrap(bytes))
+
+ /**
+ * Create a hexidecimal string for the given bytes
+ */
+ def hexString(buffer: ByteBuffer): String = {
+ val builder = new StringBuilder("0x")
+ for(i <- 0 until buffer.limit)
+ builder.append(String.format("%x", Integer.valueOf(buffer.get(buffer.position + i))))
+ builder.toString
+ }
+
+ /**
+ * Create a producer for the given host and port
+ */
+ def createProducer(host: String, port: Int): SyncProducer = {
+ val props = new Properties()
+ props.put("host", host)
+ props.put("port", port.toString)
+ props.put("buffer.size", "65536")
+ props.put("connect.timeout.ms", "100000")
+ props.put("reconnect.interval", "10000")
+ return new SyncProducer(new SyncProducerConfig(props))
+ }
+
+ def updateConsumerOffset(config : ConsumerConfig, path : String, offset : Long) = {
+ val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, StringSerializer)
+ ZkUtils.updatePersistentPath(zkClient, path, offset.toString)
+
+ }
+}
+
+object TestZKUtils {
+ val zookeeperConnect = "127.0.0.1:2182"
+}
Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/UtilsTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/UtilsTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/utils/UtilsTest.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import org.apache.log4j.Logger
+import org.scalatest.junit.JUnitSuite
+import org.junit.Test
+
+class UtilsTest extends JUnitSuite {
+
+ private val logger = Logger.getLogger(classOf[UtilsTest])
+
+ @Test
+ def testSwallow() {
+ Utils.swallow(logger.info, throw new IllegalStateException("test"))
+ }
+
+}
Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.zk
+
+import org.apache.zookeeper.server.ZooKeeperServer
+import org.apache.zookeeper.server.NIOServerCnxn
+import kafka.utils.TestUtils
+import org.I0Itec.zkclient.ZkClient
+import java.net.InetSocketAddress
+import kafka.utils.{Utils, StringSerializer}
+
+class EmbeddedZookeeper(val connectString: String) {
+ val snapshotDir = TestUtils.tempDir()
+ val logDir = TestUtils.tempDir()
+ val zookeeper = new ZooKeeperServer(snapshotDir, logDir, 3000)
+ val port = connectString.split(":")(1).toInt
+ val factory = new NIOServerCnxn.Factory(new InetSocketAddress("127.0.0.1", port))
+ factory.startup(zookeeper)
+ val client = new ZkClient(connectString)
+ client.setZkSerializer(StringSerializer)
+
+ def shutdown() {
+ factory.shutdown()
+ Utils.rm(logDir)
+ Utils.rm(snapshotDir)
+ }
+
+}
Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.zk
+
+import kafka.consumer.ConsumerConfig
+import org.I0Itec.zkclient.ZkClient
+import kafka.utils.{ZkUtils, StringSerializer}
+import kafka.utils.{TestZKUtils, TestUtils}
+import org.junit.Assert
+import org.scalatest.junit.JUnit3Suite
+
+class ZKEphemeralTest extends JUnit3Suite with ZooKeeperTestHarness {
+ val zkConnect = TestZKUtils.zookeeperConnect
+ var zkSessionTimeoutMs = 1000
+
+ def testEphemeralNodeCleanup = {
+ val config = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, "test", "1"))
+ var zkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
+ StringSerializer)
+
+ try {
+ ZkUtils.createEphemeralPathExpectConflict(zkClient, "/tmp/zktest", "node created")
+ } catch {
+ case e: Exception => println("Exception in creating ephemeral node")
+ }
+
+ var testData: String = null
+
+ testData = ZkUtils.readData(zkClient, "/tmp/zktest")
+ Assert.assertNotNull(testData)
+
+ zkClient.close
+
+ Thread.sleep(zkSessionTimeoutMs)
+
+ zkClient = new ZkClient(zkConnect, zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
+ StringSerializer)
+
+ val nodeExists = ZkUtils.pathExists(zkClient, "/tmp/zktest")
+ Assert.assertFalse(nodeExists)
+ }
+}
Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/ZKLoadBalanceTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/ZKLoadBalanceTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/ZKLoadBalanceTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/ZKLoadBalanceTest.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,126 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.zk
+
+import junit.framework.Assert._
+import java.util.Collections
+import kafka.consumer.{ConsumerConfig, ZookeeperConsumerConnector}
+import java.lang.Thread
+import org.scalatest.junit.JUnit3Suite
+import kafka.utils.{TestUtils, ZkUtils, ZKGroupTopicDirs, TestZKUtils}
+
+class ZKLoadBalanceTest extends JUnit3Suite with ZooKeeperTestHarness {
+ val zkConnect = TestZKUtils.zookeeperConnect
+ var dirs : ZKGroupTopicDirs = null
+ val topic = "topic1"
+ val group = "group1"
+ val firstConsumer = "consumer1"
+ val secondConsumer = "consumer2"
+
+ override def setUp() {
+ super.setUp()
+
+ dirs = new ZKGroupTopicDirs(group, topic)
+ }
+
+ def testLoadBalance() {
+ // create the first partition
+ ZkUtils.setupPartition(zookeeper.client, 400, "broker1", 1111, "topic1", 1)
+ // add the first consumer
+ val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, firstConsumer))
+ val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, false)
+ zkConsumerConnector1.createMessageStreams(Map(topic -> 1))
+
+ {
+ // check Partition Owner Registry
+ val actual_1 = getZKChildrenValues(dirs.consumerOwnerDir)
+ val expected_1 = List( ("400-0", "group1_consumer1-0") )
+ checkSetEqual(actual_1, expected_1)
+ }
+
+ // add a second consumer
+ val consumerConfig2 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, secondConsumer))
+ val ZKConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, false)
+ ZKConsumerConnector2.createMessageStreams(Map(topic -> 1))
+ // wait a bit to make sure rebalancing logic is triggered
+ Thread.sleep(200)
+
+ {
+ // check Partition Owner Registry
+ val actual_2 = getZKChildrenValues(dirs.consumerOwnerDir)
+ val expected_2 = List( ("400-0", "group1_consumer1-0") )
+ checkSetEqual(actual_2, expected_2)
+ }
+
+ {
+ // add a few more partitions
+ val brokers = List(
+ (200, "broker2", 1111, "topic1", 2),
+ (300, "broker3", 1111, "topic1", 2) )
+
+ for ((brokerID, host, port, topic, nParts) <- brokers)
+ ZkUtils.setupPartition(zookeeper.client, brokerID, host, port, topic, nParts)
+
+
+ // wait a bit to make sure rebalancing logic is triggered
+ Thread.sleep(1000)
+ // check Partition Owner Registry
+ val actual_3 = getZKChildrenValues(dirs.consumerOwnerDir)
+ val expected_3 = List( ("200-0", "group1_consumer1-0"),
+ ("200-1", "group1_consumer1-0"),
+ ("300-0", "group1_consumer1-0"),
+ ("300-1", "group1_consumer2-0"),
+ ("400-0", "group1_consumer2-0") )
+ checkSetEqual(actual_3, expected_3)
+ }
+
+ {
+ // now delete a partition
+ ZkUtils.deletePartition(zookeeper.client, 400, "topic1")
+
+ // wait a bit to make sure rebalancing logic is triggered
+ Thread.sleep(500)
+ // check Partition Owner Registry
+ val actual_4 = getZKChildrenValues(dirs.consumerOwnerDir)
+ val expected_4 = List( ("200-0", "group1_consumer1-0"),
+ ("200-1", "group1_consumer1-0"),
+ ("300-0", "group1_consumer2-0"),
+ ("300-1", "group1_consumer2-0") )
+ checkSetEqual(actual_4, expected_4)
+ }
+
+ zkConsumerConnector1.shutdown
+ ZKConsumerConnector2.shutdown
+ }
+
+ private def getZKChildrenValues(path : String) : Seq[Tuple2[String,String]] = {
+ import scala.collection.JavaConversions
+ val children = zookeeper.client.getChildren(path)
+ Collections.sort(children)
+ val childrenAsSeq : Seq[java.lang.String] = JavaConversions.asBuffer(children)
+ childrenAsSeq.map(partition =>
+ (partition, zookeeper.client.readData(path + "/" + partition).asInstanceOf[String]))
+ }
+
+ private def checkSetEqual(actual : Seq[Tuple2[String,String]], expected : Seq[Tuple2[String,String]]) {
+ assertEquals(expected.length, actual.length)
+ for (i <- 0 until expected.length) {
+ assertEquals(expected(i)._1, actual(i)._1)
+ assertEquals(expected(i)._2, actual(i)._2)
+ }
+ }
+}
Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.zk
+
+import org.scalatest.junit.JUnit3Suite
+
+trait ZooKeeperTestHarness extends JUnit3Suite {
+ val zkConnect: String
+ var zookeeper: EmbeddedZookeeper = null
+
+ override def setUp() {
+ zookeeper = new EmbeddedZookeeper(zkConnect)
+ super.setUp
+ }
+
+ override def tearDown() {
+ super.tearDown
+ zookeeper.shutdown()
+ }
+
+}