You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2011/08/02 01:42:17 UTC
svn commit: r1152970 [21/26] - in /incubator/kafka: branches/ site/ trunk/
trunk/bin/ trunk/clients/ trunk/clients/clojure/
trunk/clients/clojure/leiningen/ trunk/clients/clojure/resources/
trunk/clients/clojure/src/ trunk/clients/clojure/src/kafka/ tr...
Added: incubator/kafka/trunk/core/src/main/scala/kafka/utils/package.html
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/package.html?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/package.html (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/package.html Mon Aug 1 23:41:24 2011
@@ -0,0 +1 @@
+Utility functions.
\ No newline at end of file
Added: incubator/kafka/trunk/core/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/resources/log4j.properties?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/resources/log4j.properties (added)
+++ incubator/kafka/trunk/core/src/test/resources/log4j.properties Mon Aug 1 23:41:24 2011
@@ -0,0 +1,9 @@
+log4j.rootLogger=OFF, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+# Turn on all our debugging info
+log4j.logger.kafka=OFF
+
Added: incubator/kafka/trunk/core/src/test/resources/test-kafka-logs/MagicByte0-0/00000000000000000000.kafka
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/resources/test-kafka-logs/MagicByte0-0/00000000000000000000.kafka?rev=1152970&view=auto
==============================================================================
Binary file - no diff available.
Propchange: incubator/kafka/trunk/core/src/test/resources/test-kafka-logs/MagicByte0-0/00000000000000000000.kafka
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: incubator/kafka/trunk/core/src/test/scala/other/kafka.log4j.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/other/kafka.log4j.properties?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/other/kafka.log4j.properties (added)
+++ incubator/kafka/trunk/core/src/test/scala/other/kafka.log4j.properties Mon Aug 1 23:41:24 2011
@@ -0,0 +1,8 @@
+log4j.rootLogger=INFO, KAFKA
+
+log4j.appender.KAFKA=kafka.log4j.KafkaAppender
+
+log4j.appender.KAFKA.Port=9092
+log4j.appender.KAFKA.Host=localhost
+log4j.appender.KAFKA.Topic=test-logger
+log4j.appender.KAFKA.Serializer=kafka.AppenderStringSerializer
Added: incubator/kafka/trunk/core/src/test/scala/other/kafka/DeleteZKPath.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/other/kafka/DeleteZKPath.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/other/kafka/DeleteZKPath.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/other/kafka/DeleteZKPath.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka
+
+import consumer.ConsumerConfig
+import utils.{StringSerializer, ZkUtils, Utils}
+import org.I0Itec.zkclient.ZkClient
+
+object DeleteZKPath {
+ def main(args: Array[String]) {
+ if(args.length < 2) {
+ println("USAGE: " + DeleteZKPath.getClass.getName + " consumer.properties zk_path")
+ System.exit(1)
+ }
+
+ val config = new ConsumerConfig(Utils.loadProps(args(0)))
+ val zkPath = args(1)
+
+ val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
+ StringSerializer)
+
+ try {
+ ZkUtils.deletePathRecursive(zkClient, zkPath);
+ System.out.println(zkPath + " is deleted")
+ } catch {
+ case e: Exception => System.err.println("Path not deleted " + e.printStackTrace())
+ }
+
+ }
+}
Added: incubator/kafka/trunk/core/src/test/scala/other/kafka/TestKafkaAppender.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/other/kafka/TestKafkaAppender.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/other/kafka/TestKafkaAppender.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/other/kafka/TestKafkaAppender.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka
+
+import message.Message
+import org.apache.log4j.{Logger, PropertyConfigurator}
+import serializer.Encoder
+
+object TestKafkaAppender {
+
+ private val logger = Logger.getLogger(TestKafkaAppender.getClass)
+
+ def main(args:Array[String]) {
+
+ if(args.length < 1) {
+ println("USAGE: " + TestKafkaAppender.getClass.getName + " log4j_config")
+ System.exit(1)
+ }
+
+ try {
+ PropertyConfigurator.configure(args(0))
+ } catch {
+ case e: Exception => System.err.println("KafkaAppender could not be initialized ! Exiting..")
+ e.printStackTrace()
+ System.exit(1)
+ }
+
+ for(i <- 1 to 10)
+ logger.info("test")
+ }
+}
+
+class AppenderStringSerializer extends Encoder[AnyRef] {
+ def toMessage(event: AnyRef):Message = new Message(event.asInstanceOf[String].getBytes)
+}
+
Added: incubator/kafka/trunk/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka
+
+import java.net.URI
+import java.util.Arrays.asList
+import java.io._
+import java.nio._
+import java.nio.channels._
+import joptsimple._
+
+object TestLinearWriteSpeed {
+
+ def main(args: Array[String]): Unit = {
+ val parser = new OptionParser
+ val bytesOpt = parser.accepts("bytes", "REQUIRED: The number of bytes to write.")
+ .withRequiredArg
+ .describedAs("num_bytes")
+ .ofType(classOf[java.lang.Integer])
+ val sizeOpt = parser.accepts("size", "REQUIRED: The size of each write.")
+ .withRequiredArg
+ .describedAs("num_bytes")
+ .ofType(classOf[java.lang.Integer])
+ val filesOpt = parser.accepts("files", "REQUIRED: The number of files.")
+ .withRequiredArg
+ .describedAs("num_files")
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(1)
+
+ val options = parser.parse(args : _*)
+
+ for(arg <- List(bytesOpt, sizeOpt, filesOpt)) {
+ if(!options.has(arg)) {
+ System.err.println("Missing required argument \"" + arg + "\"")
+ parser.printHelpOn(System.err)
+ System.exit(1)
+ }
+ }
+
+ val bytesToWrite = options.valueOf(bytesOpt).intValue
+ val bufferSize = options.valueOf(sizeOpt).intValue
+ val numFiles = options.valueOf(filesOpt).intValue
+ val buffer = ByteBuffer.allocate(bufferSize)
+ while(buffer.hasRemaining)
+ buffer.put(123.asInstanceOf[Byte])
+
+ val channels = new Array[FileChannel](numFiles)
+ for(i <- 0 until numFiles) {
+ val file = File.createTempFile("kafka-test", ".dat")
+ file.deleteOnExit()
+ channels(i) = new RandomAccessFile(file, "rw").getChannel()
+ }
+
+ val begin = System.currentTimeMillis
+ for(i <- 0 until bytesToWrite / bufferSize) {
+ buffer.rewind()
+ channels(i % numFiles).write(buffer)
+ }
+ val ellapsedSecs = (System.currentTimeMillis - begin) / 1000.0
+ System.out.println(bytesToWrite / (1024 * 1024 * ellapsedSecs) + " MB per sec")
+ }
+
+}
Added: incubator/kafka/trunk/core/src/test/scala/other/kafka/TestLogPerformance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/other/kafka/TestLogPerformance.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/other/kafka/TestLogPerformance.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/other/kafka/TestLogPerformance.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import kafka.log._
+import kafka.message._
+import kafka.utils.{TestUtils, Utils}
+
+object TestLogPerformance {
+
+ def main(args: Array[String]): Unit = {
+ if(args.length < 4)
+ Utils.croak("USAGE: java " + getClass().getName() + " num_messages message_size batch_size compression_codec")
+ val numMessages = args(0).toInt
+ val messageSize = args(1).toInt
+ val batchSize = args(2).toInt
+ val compressionCodec = CompressionCodec.getCompressionCodec(args(3).toInt)
+ val dir = TestUtils.tempDir()
+ val log = new Log(dir, 50*1024*1024, 5000000, false)
+ val bytes = new Array[Byte](messageSize)
+ new java.util.Random().nextBytes(bytes)
+ val message = new Message(bytes)
+ val messages = new Array[Message](batchSize)
+ for(i <- 0 until batchSize)
+ messages(i) = message
+ val messageSet = new ByteBufferMessageSet(compressionCodec = compressionCodec, messages = messages: _*)
+ val numBatches = numMessages / batchSize
+ val start = System.currentTimeMillis()
+ for(i <- 0 until numBatches)
+ log.append(messageSet)
+ log.close()
+ val ellapsed = (System.currentTimeMillis() - start) / 1000.0
+ val writtenBytes = MessageSet.entrySize(message) * numMessages
+ println("message size = " + MessageSet.entrySize(message))
+ println("MB/sec: " + writtenBytes / ellapsed / (1024.0 * 1024.0))
+ Utils.rm(dir)
+ }
+
+}
Added: incubator/kafka/trunk/core/src/test/scala/other/kafka/TestTruncate.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/other/kafka/TestTruncate.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/other/kafka/TestTruncate.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/other/kafka/TestTruncate.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka
+
+import java.io._
+import java.nio._
+
+/* This code tests the correct function of java's FileChannel.truncate--some platforms don't work. */
+object TestTruncate {
+
+ def main(args: Array[String]): Unit = {
+ val name = File.createTempFile("kafka", ".test")
+ name.deleteOnExit()
+ val file = new RandomAccessFile(name, "rw").getChannel()
+ val buffer = ByteBuffer.allocate(12)
+ buffer.putInt(4).putInt(4).putInt(4)
+ buffer.rewind()
+ file.write(buffer)
+ println("position prior to truncate: " + file.position)
+ file.truncate(4)
+ println("position after truncate to 4: " + file.position)
+ }
+
+}
Added: incubator/kafka/trunk/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka
+
+import consumer._
+import utils.Utils
+import java.util.concurrent.CountDownLatch
+
+object TestZKConsumerOffsets {
+ def main(args: Array[String]): Unit = {
+ if(args.length < 1) {
+ println("USAGE: " + TestZKConsumerOffsets.getClass.getName + " consumer.properties topic latest")
+ System.exit(1)
+ }
+ println("Starting consumer...")
+ val topic = args(1)
+ val autoOffsetReset = args(2)
+ val props = Utils.loadProps(args(0))
+ props.put("autooffset.reset", "largest")
+
+ val config = new ConsumerConfig(props)
+ val consumerConnector: ConsumerConnector = Consumer.create(config)
+ val topicMessageStreams = consumerConnector.createMessageStreams(Predef.Map(topic -> 1))
+ var threadList = List[ConsumerThread]()
+ for ((topic, streamList) <- topicMessageStreams)
+ for (stream <- streamList)
+ threadList ::= new ConsumerThread(stream)
+
+ for (thread <- threadList)
+ thread.start
+
+ // attach shutdown handler to catch control-c
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ override def run() = {
+ consumerConnector.shutdown
+ threadList.foreach(_.shutdown)
+ println("consumer threads shutted down")
+ }
+ })
+ }
+}
+
+private class ConsumerThread(stream: KafkaMessageStream) extends Thread {
+ val shutdownLatch = new CountDownLatch(1)
+
+ override def run() {
+ println("Starting consumer thread..")
+ for (message <- stream) {
+ println("consumed: " + Utils.toString(message.payload, "UTF-8"))
+ }
+ shutdownLatch.countDown
+ println("thread shutdown !" )
+ }
+
+ def shutdown() {
+ shutdownLatch.await
+ }
+}
Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/consumer/TopicCountTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/consumer/TopicCountTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/consumer/TopicCountTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/consumer/TopicCountTest.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+import junit.framework.Assert._
+import org.junit.Test
+import org.scalatest.junit.JUnitSuite
+import kafka.cluster.Partition
+
+
+class TopicCountTest extends JUnitSuite {
+
+ @Test
+ def testBasic() {
+ val consumer = "conusmer1"
+ val json = """{ "topic1" : 2, "topic2" : 3 }"""
+ val topicCount = TopicCount.constructTopicCount(consumer, json)
+ val topicCountMap = Map(
+ "topic1" -> 2,
+ "topic2" -> 3
+ )
+ val expectedTopicCount = new TopicCount(consumer, topicCountMap)
+ assertTrue(expectedTopicCount == topicCount)
+
+ val topicCount2 = TopicCount.constructTopicCount(consumer, expectedTopicCount.toJsonString)
+ assertTrue(expectedTopicCount == topicCount2)
+ }
+
+ @Test
+ def testPartition() {
+ assertTrue(new Partition(10, 0) == new Partition(10, 0))
+ assertTrue(new Partition(10, 1) != new Partition(10, 0))
+ }
+}
Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,267 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+import junit.framework.Assert._
+import kafka.zk.ZooKeeperTestHarness
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import scala.collection._
+import kafka.utils.Utils
+import kafka.utils.{TestZKUtils, TestUtils}
+import org.scalatest.junit.JUnit3Suite
+import org.apache.log4j.{Level, Logger}
+import kafka.message._
+
+class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with ZooKeeperTestHarness {
+ private val logger = Logger.getLogger(getClass())
+
+ val zookeeperConnect = TestZKUtils.zookeeperConnect
+ val zkConnect = zookeeperConnect
+ val numNodes = 2
+ val numParts = 2
+ val topic = "topic1"
+ val configs =
+ for(props <- TestUtils.createBrokerConfigs(numNodes))
+ yield new KafkaConfig(props) {
+ override val enableZookeeper = true
+ override val numPartitions = numParts
+ override val zkConnect = zookeeperConnect
+ }
+ val group = "group1"
+ val consumer0 = "consumer0"
+ val consumer1 = "consumer1"
+ val consumer2 = "consumer2"
+ val consumer3 = "consumer3"
+ val nMessages = 2
+
+ def testBasic() {
+ val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandlers])
+ requestHandlerLogger.setLevel(Level.FATAL)
+
+ var actualMessages: List[Message] = Nil
+
+ // test consumer timeout logic
+ val consumerConfig0 = new ConsumerConfig(
+ TestUtils.createConsumerProperties(zkConnect, group, consumer0)) {
+ override val consumerTimeoutMs = 200
+ }
+ val zkConsumerConnector0 = new ZookeeperConsumerConnector(consumerConfig0, true)
+ val topicMessageStreams0 = zkConsumerConnector0.createMessageStreams(Predef.Map(topic -> numNodes*numParts/2))
+ try {
+ getMessages(nMessages*2, topicMessageStreams0)
+ fail("should get an exception")
+ }
+ catch {
+ case e: ConsumerTimeoutException => // this is ok
+ println("This is ok")
+ case e => throw e
+ }
+ zkConsumerConnector0.shutdown
+
+ // send some messages to each broker
+ val sentMessages1 = sendMessages(nMessages, "batch1")
+ // create a consumer
+ val consumerConfig1 = new ConsumerConfig(
+ TestUtils.createConsumerProperties(zkConnect, group, consumer1))
+ val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
+ val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic -> numNodes*numParts/2))
+ val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1)
+ assertEquals(sentMessages1, receivedMessages1)
+ // commit consumed offsets
+ zkConsumerConnector1.commitOffsets
+
+ // create a consumer
+ val consumerConfig2 = new ConsumerConfig(
+ TestUtils.createConsumerProperties(zkConnect, group, consumer2))
+ val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true)
+ val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Predef.Map(topic -> numNodes*numParts/2))
+ // send some messages to each broker
+ val sentMessages2 = sendMessages(nMessages, "batch2")
+ Thread.sleep(200)
+ val receivedMessages2_1 = getMessages(nMessages, topicMessageStreams1)
+ val receivedMessages2_2 = getMessages(nMessages, topicMessageStreams2)
+ val receivedMessages2 = (receivedMessages2_1 ::: receivedMessages2_2).sortWith((s,t) => s.checksum < t.checksum)
+ assertEquals(sentMessages2, receivedMessages2)
+
+ // create a consumer with empty map
+ val consumerConfig3 = new ConsumerConfig(
+ TestUtils.createConsumerProperties(zkConnect, group, consumer3))
+ val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true)
+ val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int]())
+ // send some messages to each broker
+ Thread.sleep(200)
+ val sentMessages3 = sendMessages(nMessages, "batch3")
+ Thread.sleep(200)
+ val receivedMessages3_1 = getMessages(nMessages, topicMessageStreams1)
+ val receivedMessages3_2 = getMessages(nMessages, topicMessageStreams2)
+ val receivedMessages3 = (receivedMessages3_1 ::: receivedMessages3_2).sortWith((s,t) => s.checksum < t.checksum)
+ assertEquals(sentMessages3, receivedMessages3)
+
+ zkConsumerConnector1.shutdown
+ zkConsumerConnector2.shutdown
+ zkConsumerConnector3.shutdown
+ logger.info("all consumer connectors stopped")
+ requestHandlerLogger.setLevel(Level.ERROR)
+ }
+
+ def testCompression() {
+ val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandlers])
+ requestHandlerLogger.setLevel(Level.FATAL)
+
+ var actualMessages: List[Message] = Nil
+
+ // test consumer timeout logic
+ val consumerConfig0 = new ConsumerConfig(
+ TestUtils.createConsumerProperties(zkConnect, group, consumer0)) {
+ override val consumerTimeoutMs = 200
+ }
+ val zkConsumerConnector0 = new ZookeeperConsumerConnector(consumerConfig0, true)
+ val topicMessageStreams0 = zkConsumerConnector0.createMessageStreams(Predef.Map(topic -> numNodes*numParts/2))
+ try {
+ getMessages(nMessages*2, topicMessageStreams0)
+ fail("should get an exception")
+ }
+ catch {
+ case e: ConsumerTimeoutException => // this is ok
+ println("This is ok")
+ case e => throw e
+ }
+ zkConsumerConnector0.shutdown
+
+ println("Sending messages for 1st consumer")
+ // send some messages to each broker
+ val sentMessages1 = sendMessages(nMessages, "batch1", DefaultCompressionCodec)
+ // create a consumer
+ val consumerConfig1 = new ConsumerConfig(
+ TestUtils.createConsumerProperties(zkConnect, group, consumer1))
+ val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
+ val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic -> numNodes*numParts/2))
+ val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1)
+ assertEquals(sentMessages1, receivedMessages1)
+ // commit consumed offsets
+ zkConsumerConnector1.commitOffsets
+
+ println("Sending more messages for 2nd consumer")
+ // create a consumer
+ val consumerConfig2 = new ConsumerConfig(
+ TestUtils.createConsumerProperties(zkConnect, group, consumer2))
+ val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true)
+ val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Predef.Map(topic -> numNodes*numParts/2))
+ // send some messages to each broker
+ val sentMessages2 = sendMessages(nMessages, "batch2", DefaultCompressionCodec)
+ Thread.sleep(200)
+ val receivedMessages2_1 = getMessages(nMessages, topicMessageStreams1)
+ val receivedMessages2_2 = getMessages(nMessages, topicMessageStreams2)
+ val receivedMessages2 = (receivedMessages2_1 ::: receivedMessages2_2).sortWith((s,t) => s.checksum < t.checksum)
+ assertEquals(sentMessages2, receivedMessages2)
+
+ // create a consumer with empty map
+ println("Sending more messages for 3rd consumer")
+ val consumerConfig3 = new ConsumerConfig(
+ TestUtils.createConsumerProperties(zkConnect, group, consumer3))
+ val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true)
+ val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int]())
+ // send some messages to each broker
+ Thread.sleep(200)
+ val sentMessages3 = sendMessages(nMessages, "batch3", DefaultCompressionCodec)
+ Thread.sleep(200)
+ val receivedMessages3_1 = getMessages(nMessages, topicMessageStreams1)
+ val receivedMessages3_2 = getMessages(nMessages, topicMessageStreams2)
+ val receivedMessages3 = (receivedMessages3_1 ::: receivedMessages3_2).sortWith((s,t) => s.checksum < t.checksum)
+ assertEquals(sentMessages3, receivedMessages3)
+
+ zkConsumerConnector1.shutdown
+ zkConsumerConnector2.shutdown
+ zkConsumerConnector3.shutdown
+ logger.info("all consumer connectors stopped")
+ requestHandlerLogger.setLevel(Level.ERROR)
+ }
+
+ def testCompressionSetConsumption() {
+ val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandlers])
+ requestHandlerLogger.setLevel(Level.FATAL)
+
+ var actualMessages: List[Message] = Nil
+
+ // shutdown one server
+ servers.last.shutdown
+ Thread.sleep(500)
+
+ // send some messages to each broker
+ val sentMessages = sendMessages(configs.head, 200, "batch1", DefaultCompressionCodec)
+ // test consumer timeout logic
+ val consumerConfig0 = new ConsumerConfig(
+ TestUtils.createConsumerProperties(zkConnect, group, consumer0)) {
+ override val consumerTimeoutMs = 5000
+ }
+ val zkConsumerConnector0 = new ZookeeperConsumerConnector(consumerConfig0, true)
+ val topicMessageStreams0 = zkConsumerConnector0.createMessageStreams(Predef.Map(topic -> 1))
+ getMessages(100, topicMessageStreams0)
+ zkConsumerConnector0.shutdown
+ // at this point, only some part of the message set was consumed. So consumed offset should still be 0
+ // also fetched offset should be 0
+ val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig0, true)
+ val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic -> 1))
+ val receivedMessages = getMessages(400, topicMessageStreams1)
+ val sortedReceivedMessages = receivedMessages.sortWith((s,t) => s.checksum < t.checksum)
+ val sortedSentMessages = sentMessages.sortWith((s,t) => s.checksum < t.checksum)
+ assertEquals(sortedSentMessages, sortedReceivedMessages)
+ zkConsumerConnector1.shutdown
+
+ requestHandlerLogger.setLevel(Level.ERROR)
+ }
+
+ def sendMessages(conf: KafkaConfig, messagesPerNode: Int, header: String, compression: CompressionCodec): List[Message]= {
+ var messages: List[Message] = Nil
+ val producer = TestUtils.createProducer("localhost", conf.port)
+ for (partition <- 0 until numParts) {
+ val ms = 0.until(messagesPerNode).map(x =>
+ new Message((header + conf.brokerId + "-" + partition + "-" + x).getBytes)).toArray
+ val mSet = new ByteBufferMessageSet(compressionCodec = compression, messages = ms: _*)
+ for (message <- ms)
+ messages ::= message
+ producer.send(topic, partition, mSet)
+ }
+ producer.close()
+ messages
+ }
+
+ def sendMessages(messagesPerNode: Int, header: String, compression: CompressionCodec = NoCompressionCodec): List[Message]= {
+ var messages: List[Message] = Nil
+ for(conf <- configs) {
+ messages ++= sendMessages(conf, messagesPerNode, header, compression)
+ }
+ messages.sortWith((s,t) => s.checksum < t.checksum)
+ }
+
+ def getMessages(nMessagesPerThread: Int, topicMessageStreams: Map[String,List[KafkaMessageStream]]): List[Message]= {
+ var messages: List[Message] = Nil
+ for ((topic, messageStreams) <- topicMessageStreams) {
+ for (messageStream <- messageStreams) {
+ val iterator = messageStream.iterator
+ for (i <- 0 until nMessagesPerThread) {
+ assertTrue(iterator.hasNext)
+ val message = iterator.next
+ messages ::= message
+ logger.debug("received message: " + Utils.toString(message.payload, "UTF-8"))
+ }
+ }
+ }
+ messages.sortWith((s,t) => s.checksum < t.checksum)
+ }
+}
Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,223 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.integration
+
+import junit.framework.Assert._
+import kafka.zk.ZooKeeperTestHarness
+import java.nio.channels.ClosedByInterruptException
+import java.util.concurrent.atomic.AtomicInteger
+import kafka.utils.ZKGroupTopicDirs
+import kafka.consumer.{ConsumerTimeoutException, ConsumerConfig, ConsumerConnector, Consumer}
+import kafka.server.{KafkaRequestHandlers, KafkaServer, KafkaConfig}
+import org.apache.log4j.{Level, Logger}
+import org.scalatest.junit.JUnit3Suite
+import kafka.utils.{TestUtils, TestZKUtils}
+
+class AutoOffsetResetTest extends JUnit3Suite with ZooKeeperTestHarness {
+
+ val zkConnect = TestZKUtils.zookeeperConnect
+ val topic = "test_topic"
+ val group = "default_group"
+ val testConsumer = "consumer"
+ val brokerPort = 9892
+ val kafkaConfig = new KafkaConfig(TestUtils.createBrokerConfig(0, brokerPort))
+ var kafkaServer : KafkaServer = null
+ val numMessages = 10
+ val largeOffset = 10000
+ val smallOffset = -1
+
+ private val logger = Logger.getLogger(getClass())
+ val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandlers])
+
+ override def setUp() {
+ super.setUp()
+ kafkaServer = TestUtils.createServer(kafkaConfig)
+
+ // temporarily set request handler logger to a higher level
+ requestHandlerLogger.setLevel(Level.FATAL)
+ }
+
+ override def tearDown() {
+ // restore set request handler logger to a higher level
+ requestHandlerLogger.setLevel(Level.ERROR)
+ kafkaServer.shutdown
+ super.tearDown
+ }
+
+ def testEarliestOffsetResetForward() = {
+ val producer = TestUtils.createProducer("localhost", brokerPort)
+
+ for(i <- 0 until numMessages) {
+ producer.send(topic, TestUtils.singleMessageSet("test".getBytes()))
+ }
+
+ // update offset in zookeeper for consumer to jump "forward" in time
+ val dirs = new ZKGroupTopicDirs(group, topic)
+ var consumerProps = TestUtils.createConsumerProperties(zkConnect, group, testConsumer)
+ consumerProps.put("autooffset.reset", "smallest")
+ consumerProps.put("consumer.timeout.ms", "2000")
+ val consumerConfig = new ConsumerConfig(consumerProps)
+
+ TestUtils.updateConsumerOffset(consumerConfig, dirs.consumerOffsetDir + "/" + "0-0", largeOffset)
+ logger.info("Updated consumer offset to " + largeOffset)
+
+ Thread.sleep(500)
+ val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig)
+ val messageStreams = consumerConnector.createMessageStreams(Map(topic -> 1))
+
+ var threadList = List[Thread]()
+ val nMessages : AtomicInteger = new AtomicInteger(0)
+ for ((topic, streamList) <- messageStreams)
+ for (i <- 0 until streamList.length)
+ threadList ::= new Thread("kafka-zk-consumer-" + i) {
+ override def run() {
+
+ try {
+ for (message <- streamList(i)) {
+ nMessages.incrementAndGet
+ }
+ }
+ catch {
+ case te: ConsumerTimeoutException => logger.info("Consumer thread timing out..")
+ case _: InterruptedException =>
+ case _: ClosedByInterruptException =>
+ case e => throw e
+ }
+ }
+
+ }
+
+
+ for (thread <- threadList)
+ thread.start
+
+ threadList(0).join(2000)
+
+ logger.info("Asserting...")
+ assertEquals(numMessages, nMessages.get)
+ consumerConnector.shutdown
+ }
+
+ def testEarliestOffsetResetBackward() = {
+ val producer = TestUtils.createProducer("localhost", brokerPort)
+
+ for(i <- 0 until numMessages) {
+ producer.send(topic, TestUtils.singleMessageSet("test".getBytes()))
+ }
+
+ // update offset in zookeeper for consumer to jump "forward" in time
+ val dirs = new ZKGroupTopicDirs(group, topic)
+ var consumerProps = TestUtils.createConsumerProperties(zkConnect, group, testConsumer)
+ consumerProps.put("autooffset.reset", "smallest")
+ consumerProps.put("consumer.timeout.ms", "2000")
+ val consumerConfig = new ConsumerConfig(consumerProps)
+
+ TestUtils.updateConsumerOffset(consumerConfig, dirs.consumerOffsetDir + "/" + "0-0", smallOffset)
+ logger.info("Updated consumer offset to " + smallOffset)
+
+
+ val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig)
+ val messageStreams = consumerConnector.createMessageStreams(Map(topic -> 1))
+
+ var threadList = List[Thread]()
+ val nMessages : AtomicInteger = new AtomicInteger(0)
+ for ((topic, streamList) <- messageStreams)
+ for (i <- 0 until streamList.length)
+ threadList ::= new Thread("kafka-zk-consumer-" + i) {
+ override def run() {
+
+ try {
+ for (message <- streamList(i)) {
+ nMessages.incrementAndGet
+ }
+ }
+ catch {
+ case _: InterruptedException =>
+ case _: ClosedByInterruptException =>
+ case e => throw e
+ }
+ }
+
+ }
+
+
+ for (thread <- threadList)
+ thread.start
+
+ threadList(0).join(2000)
+
+ logger.info("Asserting...")
+ assertEquals(numMessages, nMessages.get)
+ consumerConnector.shutdown
+ }
+
+ def testLatestOffsetResetForward() = {
+ val producer = TestUtils.createProducer("localhost", brokerPort)
+
+ for(i <- 0 until numMessages) {
+ producer.send(topic, TestUtils.singleMessageSet("test".getBytes()))
+ }
+
+ // update offset in zookeeper for consumer to jump "forward" in time
+ val dirs = new ZKGroupTopicDirs(group, topic)
+ var consumerProps = TestUtils.createConsumerProperties(zkConnect, group, testConsumer)
+ consumerProps.put("autooffset.reset", "largest")
+ consumerProps.put("consumer.timeout.ms", "2000")
+ val consumerConfig = new ConsumerConfig(consumerProps)
+
+ TestUtils.updateConsumerOffset(consumerConfig, dirs.consumerOffsetDir + "/" + "0-0", largeOffset)
+ logger.info("Updated consumer offset to " + largeOffset)
+
+
+ val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig)
+ val messageStreams = consumerConnector.createMessageStreams(Map(topic -> 1))
+
+ var threadList = List[Thread]()
+ val nMessages : AtomicInteger = new AtomicInteger(0)
+ for ((topic, streamList) <- messageStreams)
+ for (i <- 0 until streamList.length)
+ threadList ::= new Thread("kafka-zk-consumer-" + i) {
+ override def run() {
+
+ try {
+ for (message <- streamList(i)) {
+ nMessages.incrementAndGet
+ }
+ }
+ catch {
+ case _: InterruptedException =>
+ case _: ClosedByInterruptException =>
+ case e => throw e
+ }
+ }
+
+ }
+
+
+ for (thread <- threadList)
+ thread.start
+
+ threadList(0).join(2000)
+
+ logger.info("Asserting...")
+
+ assertEquals(0, nMessages.get)
+ consumerConnector.shutdown
+ }
+
+
+}
Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.integration
+
+import kafka.server.{KafkaServer, KafkaConfig}
+import org.scalatest.junit.JUnit3Suite
+import org.apache.log4j.Logger
+import java.util.Properties
+import kafka.consumer.SimpleConsumer
+import kafka.utils.{Utils, TestUtils}
+import kafka.api.{OffsetRequest, FetchRequest}
+import junit.framework.Assert._
+import java.io.File
+
+class BackwardsCompatibilityTest extends JUnit3Suite {
+
+ val topic = "MagicByte0"
+ val group = "default_group"
+ val testConsumer = "consumer"
+ val kafkaProps = new Properties
+ val host = "localhost"
+ val port = 9892
+ val loader = getClass.getClassLoader
+ val kafkaLogDir = loader.getResource("test-kafka-logs")
+ kafkaProps.put("brokerid", "12")
+ kafkaProps.put("port", port.toString)
+ kafkaProps.put("log.dir", kafkaLogDir.getPath)
+ val kafkaConfig =
+ new KafkaConfig(kafkaProps) {
+ override val enableZookeeper = false
+ }
+ var kafkaServer : KafkaServer = null
+ var simpleConsumer: SimpleConsumer = null
+
+ private val logger = Logger.getLogger(getClass())
+
+ override def setUp() {
+ super.setUp()
+ kafkaServer = TestUtils.createServer(kafkaConfig)
+ simpleConsumer = new SimpleConsumer(host, port, 1000000, 64*1024)
+ }
+
+ override def tearDown() {
+ simpleConsumer.close
+ kafkaServer.shutdown
+ super.tearDown
+ }
+
+ // test for reading data with magic byte 0
+ def testProtocolVersion0() {
+ val lastOffset = simpleConsumer.getOffsetsBefore(topic, 0, OffsetRequest.LatestTime, 1)
+ var fetchOffset: Long = 0L
+ var messageCount: Int = 0
+
+ while(fetchOffset < lastOffset(0)) {
+ val fetched = simpleConsumer.fetch(new FetchRequest(topic, 0, fetchOffset, 10000))
+ fetched.foreach(m => fetchOffset = m.offset)
+ messageCount += fetched.size
+ }
+ assertEquals(100, messageCount)
+ }
+}
Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/FetcherTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/FetcherTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/FetcherTest.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+import java.util.concurrent._
+import java.util.concurrent.atomic._
+import scala.collection._
+import junit.framework.Assert._
+
+import kafka.cluster._
+import kafka.message._
+import kafka.server._
+import org.scalatest.junit.JUnit3Suite
+import kafka.integration.KafkaServerTestHarness
+import kafka.utils.TestUtils
+
+class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
+
+ val numNodes = 2
+ val configs =
+ for(props <- TestUtils.createBrokerConfigs(numNodes))
+ yield new KafkaConfig(props) {
+ override val enableZookeeper = false
+ }
+ val messages = new mutable.HashMap[Int, ByteBufferMessageSet]
+ val topic = "topic"
+ val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, c.brokerId.toString, "localhost", c.port)))
+ val shutdown = ZookeeperConsumerConnector.shutdownCommand
+ val queue = new LinkedBlockingQueue[FetchedDataChunk]
+ val topicInfos = configs.map(c => new PartitionTopicInfo(topic,
+ c.brokerId,
+ new Partition(c.brokerId, 0),
+ queue,
+ new AtomicLong(0),
+ new AtomicLong(0),
+ new AtomicInteger(0)))
+
+ var fetcher: Fetcher = null
+
+ override def setUp() {
+ super.setUp
+ fetcher = new Fetcher(new ConsumerConfig(TestUtils.createConsumerProperties("", "", "")), null)
+ fetcher.initConnections(topicInfos, cluster, Set(queue))
+ }
+
+ override def tearDown() {
+ fetcher.shutdown
+ super.tearDown
+ }
+
+ def testFetcher() {
+ val perNode = 2
+ var count = sendMessages(perNode)
+ fetch(count)
+ Thread.sleep(100)
+ assertQueueEmpty()
+ count = sendMessages(perNode)
+ fetch(count)
+ Thread.sleep(100)
+ assertQueueEmpty()
+ }
+
+ def assertQueueEmpty(): Unit = assertEquals(0, queue.size)
+
+ def sendMessages(messagesPerNode: Int): Int = {
+ var count = 0
+ for(conf <- configs) {
+ val producer = TestUtils.createProducer("localhost", conf.port)
+ val ms = 0.until(messagesPerNode).map(x => new Message((conf.brokerId * 5 + x).toString.getBytes)).toArray
+ val mSet = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = ms: _*)
+ messages += conf.brokerId -> mSet
+ producer.send(topic, mSet)
+ producer.close()
+ count += ms.size
+ }
+ count
+ }
+
+ def fetch(expected: Int) {
+ var count = 0
+ while(true) {
+ val chunk = queue.poll(2L, TimeUnit.SECONDS)
+ assertNotNull("Timed out waiting for data chunk " + (count + 1), chunk)
+ for(message <- chunk.messages)
+ count += 1
+ if(count == expected)
+ return
+ }
+ }
+
+}
Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.integration
+
+import java.util.Properties
+import junit.framework.Assert._
+import kafka.producer._
+import kafka.consumer._
+import kafka.message._
+import kafka.server._
+import kafka.utils.{Utils, TestUtils}
+import org.scalatest.junit.JUnit3Suite
+
+/**
+ * A test harness that brings up some number of broker nodes
+ */
+trait KafkaServerTestHarness extends JUnit3Suite {
+
+ val configs: List[KafkaConfig]
+ var servers: List[KafkaServer] = null
+
+ override def setUp() {
+ if(configs.size <= 0)
+ throw new IllegalArgumentException("Must suply at least one server config.")
+ servers = configs.map(TestUtils.createServer(_))
+ super.setUp
+ }
+
+ override def tearDown() {
+ super.tearDown
+ servers.map(server => server.shutdown())
+ servers.map(server => Utils.rm(server.config.logDir))
+ }
+
+}
Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,184 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.integration
+
+import scala.collection._
+import junit.framework.Assert._
+import kafka.common.OffsetOutOfRangeException
+import kafka.api.{ProducerRequest, FetchRequest}
+import kafka.server.{KafkaRequestHandlers, KafkaServer, KafkaConfig}
+import org.apache.log4j.{Level, Logger}
+import org.scalatest.junit.JUnit3Suite
+import kafka.utils.{TestUtils, Utils}
+import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
+
+/**
+ * End to end tests of the primitive apis against a local server
+ */
+class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness {
+
+ val port = 9999
+ val props = TestUtils.createBrokerConfig(0, port)
+ val config = new KafkaConfig(props) {
+ override val enableZookeeper = false
+ }
+ val configs = List(config)
+ var servers: List[KafkaServer] = null
+ val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandlers])
+
+ override def setUp() {
+ super.setUp
+ if(configs.size <= 0)
+ throw new IllegalArgumentException("Must suply at least one server config.")
+ servers = configs.map(TestUtils.createServer(_))
+
+ // temporarily set request handler logger to a higher level
+ requestHandlerLogger.setLevel(Level.FATAL)
+ }
+
+ override def tearDown() {
+ // restore set request handler logger to a higher level
+ requestHandlerLogger.setLevel(Level.ERROR)
+
+ super.tearDown
+ servers.map(server => server.shutdown())
+ servers.map(server => Utils.rm(server.config.logDir))
+ }
+
+ def testProduceAndFetch() {
+ // send some messages
+ val topic = "test"
+ val sent = new ByteBufferMessageSet(NoCompressionCodec,
+ new Message("hello".getBytes()), new Message("there".getBytes()))
+ producer.send(topic, sent)
+ sent.getBuffer.rewind
+ var fetched: ByteBufferMessageSet = null
+ while(fetched == null || fetched.validBytes == 0)
+ fetched = consumer.fetch(new FetchRequest(topic, 0, 0, 10000))
+ TestUtils.checkEquals(sent.iterator, fetched.iterator)
+
+ // send an invalid offset
+ var exceptionThrown = false
+ try {
+ val fetchedWithError = consumer.fetch(new FetchRequest(topic, 0, -1, 10000))
+ fetchedWithError.iterator
+ }
+ catch {
+ case e: OffsetOutOfRangeException => exceptionThrown = true
+ case e2 => throw e2
+ }
+ assertTrue(exceptionThrown)
+ }
+
+ def testProduceAndMultiFetch() {
+ // send some messages
+ val topics = List("test1", "test2", "test3");
+ {
+ val messages = new mutable.HashMap[String, ByteBufferMessageSet]
+ val fetches = new mutable.ArrayBuffer[FetchRequest]
+ for(topic <- topics) {
+ val set = new ByteBufferMessageSet(NoCompressionCodec,
+ new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
+ messages += topic -> set
+ producer.send(topic, set)
+ set.getBuffer.rewind
+ fetches += new FetchRequest(topic, 0, 0, 10000)
+ }
+
+ // wait a bit for produced message to be available
+ Thread.sleep(200)
+ val response = consumer.multifetch(fetches: _*)
+ for((topic, resp) <- topics.zip(response.toList))
+ TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
+ }
+
+ {
+ // send some invalid offsets
+ val fetches = new mutable.ArrayBuffer[FetchRequest]
+ for(topic <- topics)
+ fetches += new FetchRequest(topic, 0, -1, 10000)
+
+ var exceptionThrown = false
+ try {
+ val responses = consumer.multifetch(fetches: _*)
+ for(resp <- responses)
+ resp.iterator
+ }
+ catch {
+ case e: OffsetOutOfRangeException => exceptionThrown = true
+ case e2 => throw e2
+ }
+ assertTrue(exceptionThrown)
+ }
+ }
+
+ def testMultiProduce() {
+ // send some messages
+ val topics = List("test1", "test2", "test3");
+ val messages = new mutable.HashMap[String, ByteBufferMessageSet]
+ val fetches = new mutable.ArrayBuffer[FetchRequest]
+ var produceList: List[ProducerRequest] = Nil
+ for(topic <- topics) {
+ val set = new ByteBufferMessageSet(NoCompressionCodec,
+ new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
+ messages += topic -> set
+ produceList ::= new ProducerRequest(topic, 0, set)
+ fetches += new FetchRequest(topic, 0, 0, 10000)
+ }
+ producer.multiSend(produceList.toArray)
+
+ for (messageSet <- messages.values)
+ messageSet.getBuffer.rewind
+
+ // wait a bit for produced message to be available
+ Thread.sleep(200)
+ val response = consumer.multifetch(fetches: _*)
+ for((topic, resp) <- topics.zip(response.toList))
+ TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
+ }
+
+ def testMultiProduceResend() {
+ // send some messages
+ val topics = List("test1", "test2", "test3");
+ val messages = new mutable.HashMap[String, ByteBufferMessageSet]
+ val fetches = new mutable.ArrayBuffer[FetchRequest]
+ var produceList: List[ProducerRequest] = Nil
+ for(topic <- topics) {
+ val set = new ByteBufferMessageSet(NoCompressionCodec,
+ new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
+ messages += topic -> set
+ produceList ::= new ProducerRequest(topic, 0, set)
+ fetches += new FetchRequest(topic, 0, 0, 10000)
+ }
+ producer.multiSend(produceList.toArray)
+
+ // resend the same multisend
+ producer.multiSend(produceList.toArray)
+
+ for (messageSet <- messages.values)
+ messageSet.getBuffer.rewind
+
+ // wait a bit for produced message to be available
+ Thread.sleep(750)
+ val response = consumer.multifetch(fetches: _*)
+ for((topic, resp) <- topics.zip(response.toList))
+ TestUtils.checkEquals(TestUtils.stackedIterator(messages(topic).map(m => m.message).iterator,
+ messages(topic).map(m => m.message).iterator),
+ resp.map(m => m.message).iterator)
+// TestUtils.checkEquals(TestUtils.stackedIterator(messages(topic).iterator, messages(topic).iterator), resp.iterator)
+ }
+}
Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,94 @@
+package kafka.log
+
+import kafka.server.KafkaConfig
+import java.io.File
+import java.nio.ByteBuffer
+import kafka.utils.Utils
+import kafka.api.FetchRequest
+import kafka.common.InvalidMessageSizeException
+import kafka.zk.ZooKeeperTestHarness
+import kafka.utils.{TestZKUtils, TestUtils}
+import kafka.consumer.{ZookeeperConsumerConnector, ConsumerConfig}
+import org.scalatest.junit.JUnit3Suite
+import kafka.integration.ProducerConsumerTestHarness
+import kafka.integration.KafkaServerTestHarness
+import org.apache.log4j.{Logger, Level}
+import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
+
+class LogCorruptionTest extends JUnit3Suite with ProducerConsumerTestHarness with KafkaServerTestHarness with ZooKeeperTestHarness {
+ val zkConnect = TestZKUtils.zookeeperConnect
+ val port = 9999
+ val props = TestUtils.createBrokerConfig(0, port)
+ val config = new KafkaConfig(props) {
+ override val hostName = "localhost"
+ override val enableZookeeper = true
+ }
+ val configs = List(config)
+ val topic = "test"
+ val partition = 0
+
+ def testMessageSizeTooLarge() {
+ val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandlers])
+ val fetcherLogger = Logger.getLogger(classOf[kafka.consumer.FetcherRunnable])
+
+ requestHandlerLogger.setLevel(Level.FATAL)
+ fetcherLogger.setLevel(Level.FATAL)
+
+ // send some messages
+ val sent1 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message("hello".getBytes()))
+ producer.send(topic, sent1)
+ Thread.sleep(200)
+
+ // corrupt the file on disk
+ val logFile = new File(config.logDir + File.separator + topic + "-" + partition, Log.nameFromOffset(0))
+ val byteBuffer = ByteBuffer.allocate(4)
+ byteBuffer.putInt(1000) // wrong message size
+ byteBuffer.rewind()
+ val channel = Utils.openChannel(logFile, true)
+ channel.write(byteBuffer)
+ channel.force(true)
+ channel.close
+
+ Thread.sleep(500)
+ // test SimpleConsumer
+ val messageSet = consumer.fetch(new FetchRequest(topic, partition, 0, 10000))
+ try {
+ for (msg <- messageSet)
+ fail("shouldn't reach here in SimpleConsumer since log file is corrupted.")
+ fail("shouldn't reach here in SimpleConsumer since log file is corrupted.")
+ }
+ catch {
+ case e: InvalidMessageSizeException => "This is good"
+ }
+
+ val messageSet2 = consumer.fetch(new FetchRequest(topic, partition, 0, 10000))
+ try {
+ for (msg <- messageSet2)
+ fail("shouldn't reach here in SimpleConsumer since log file is corrupted.")
+ fail("shouldn't reach here in SimpleConsumer since log file is corrupted.")
+ }
+ catch {
+ case e: InvalidMessageSizeException => println("This is good")
+ }
+
+ // test ZookeeperConsumer
+ val consumerConfig1 = new ConsumerConfig(
+ TestUtils.createConsumerProperties(zkConnect, "group1", "consumer1", 10000))
+ val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1)
+ val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic -> 1))
+ try {
+ for ((topic, messageStreams) <- topicMessageStreams1)
+ for (message <- messageStreams(0))
+ fail("shouldn't reach here in ZookeeperConsumer since log file is corrupted.")
+ fail("shouldn't reach here in ZookeeperConsumer since log file is corrupted.")
+ }
+ catch {
+ case e: InvalidMessageSizeException => "This is good"
+ case e: Exception => "This is not bad too !"
+ }
+
+ zkConsumerConnector1.shutdown
+ requestHandlerLogger.setLevel(Level.ERROR)
+ fetcherLogger.setLevel(Level.ERROR)
+ }
+}
Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,261 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.integration
+
+import scala.collection._
+import junit.framework.Assert._
+import kafka.api.{ProducerRequest, FetchRequest}
+import kafka.common.{OffsetOutOfRangeException, InvalidPartitionException}
+import kafka.server.{KafkaRequestHandlers, KafkaConfig}
+import org.apache.log4j.{Level, Logger}
+import org.scalatest.junit.JUnit3Suite
+import java.util.Properties
+import kafka.producer.{ProducerData, Producer, ProducerConfig}
+import kafka.serializer.StringDecoder
+import kafka.utils.TestUtils
+import kafka.message.{DefaultCompressionCodec, NoCompressionCodec, Message, ByteBufferMessageSet}
+
+/**
+ * End to end tests of the primitive apis against a local server
+ */
+class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with KafkaServerTestHarness {
+
+ val port = 9999
+ val props = TestUtils.createBrokerConfig(0, port)
+ val config = new KafkaConfig(props) {
+ override val enableZookeeper = false
+ }
+ val configs = List(config)
+ val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandlers])
+
+ def testDefaultEncoderProducerAndFetch() {
+ val topic = "test-topic"
+ val props = new Properties()
+ props.put("serializer.class", "kafka.serializer.StringEncoder")
+ props.put("broker.list", "0:localhost:9999")
+ val config = new ProducerConfig(props)
+
+ val stringProducer1 = new Producer[String, String](config)
+ stringProducer1.send(new ProducerData[String, String](topic, "test", Array("test-message")))
+ Thread.sleep(200)
+
+ var fetched = consumer.fetch(new FetchRequest(topic, 0, 0, 10000))
+ assertTrue(fetched.iterator.hasNext)
+
+ val fetchedMessageAndOffset = fetched.iterator.next
+ val stringDecoder = new StringDecoder
+ val fetchedStringMessage = stringDecoder.toEvent(fetchedMessageAndOffset.message)
+ assertEquals("test-message", fetchedStringMessage)
+ }
+
+ def testDefaultEncoderProducerAndFetchWithCompression() {
+ val topic = "test-topic"
+ val props = new Properties()
+ props.put("serializer.class", "kafka.serializer.StringEncoder")
+ props.put("broker.list", "0:localhost:9999")
+ props.put("compression", "true")
+ val config = new ProducerConfig(props)
+
+ val stringProducer1 = new Producer[String, String](config)
+ stringProducer1.send(new ProducerData[String, String](topic, "test", Array("test-message")))
+ Thread.sleep(200)
+
+ var fetched = consumer.fetch(new FetchRequest(topic, 0, 0, 10000))
+ assertTrue(fetched.iterator.hasNext)
+
+ val fetchedMessageAndOffset = fetched.iterator.next
+ val stringDecoder = new StringDecoder
+ val fetchedStringMessage = stringDecoder.toEvent(fetchedMessageAndOffset.message)
+ assertEquals("test-message", fetchedStringMessage)
+ }
+
+ def testProduceAndMultiFetch() {
+ // send some messages
+ val topics = List("test1", "test2", "test3");
+ {
+ val messages = new mutable.HashMap[String, ByteBufferMessageSet]
+ val fetches = new mutable.ArrayBuffer[FetchRequest]
+ for(topic <- topics) {
+ val set = new ByteBufferMessageSet(NoCompressionCodec,
+ new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
+ messages += topic -> set
+ producer.send(topic, set)
+ set.getBuffer.rewind
+ fetches += new FetchRequest(topic, 0, 0, 10000)
+ }
+
+ // wait a bit for produced message to be available
+ Thread.sleep(700)
+ val response = consumer.multifetch(fetches: _*)
+ for((topic, resp) <- topics.zip(response.toList))
+ TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
+ }
+
+ // temporarily set request handler logger to a higher level
+ requestHandlerLogger.setLevel(Level.FATAL)
+
+ {
+ // send some invalid offsets
+ val fetches = new mutable.ArrayBuffer[FetchRequest]
+ for(topic <- topics)
+ fetches += new FetchRequest(topic, 0, -1, 10000)
+
+ try {
+ val responses = consumer.multifetch(fetches: _*)
+ for(resp <- responses)
+ resp.iterator
+ fail("expect exception")
+ }
+ catch {
+ case e: OffsetOutOfRangeException => "this is good"
+ }
+ }
+
+ {
+ // send some invalid partitions
+ val fetches = new mutable.ArrayBuffer[FetchRequest]
+ for(topic <- topics)
+ fetches += new FetchRequest(topic, -1, 0, 10000)
+
+ try {
+ val responses = consumer.multifetch(fetches: _*)
+ for(resp <- responses)
+ resp.iterator
+ fail("expect exception")
+ }
+ catch {
+ case e: InvalidPartitionException => "this is good"
+ }
+ }
+
+ // restore set request handler logger to a higher level
+ requestHandlerLogger.setLevel(Level.ERROR)
+ }
+
+ def testProduceAndMultiFetchWithCompression() {
+ // send some messages
+ val topics = List("test1", "test2", "test3");
+ {
+ val messages = new mutable.HashMap[String, ByteBufferMessageSet]
+ val fetches = new mutable.ArrayBuffer[FetchRequest]
+ for(topic <- topics) {
+ val set = new ByteBufferMessageSet(DefaultCompressionCodec,
+ new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
+ messages += topic -> set
+ producer.send(topic, set)
+ set.getBuffer.rewind
+ fetches += new FetchRequest(topic, 0, 0, 10000)
+ }
+
+ // wait a bit for produced message to be available
+ Thread.sleep(200)
+ val response = consumer.multifetch(fetches: _*)
+ for((topic, resp) <- topics.zip(response.toList))
+ TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
+ }
+
+ // temporarily set request handler logger to a higher level
+ requestHandlerLogger.setLevel(Level.FATAL)
+
+ {
+ // send some invalid offsets
+ val fetches = new mutable.ArrayBuffer[FetchRequest]
+ for(topic <- topics)
+ fetches += new FetchRequest(topic, 0, -1, 10000)
+
+ try {
+ val responses = consumer.multifetch(fetches: _*)
+ for(resp <- responses)
+ resp.iterator
+ fail("expect exception")
+ }
+ catch {
+ case e: OffsetOutOfRangeException => "this is good"
+ }
+ }
+
+ {
+ // send some invalid partitions
+ val fetches = new mutable.ArrayBuffer[FetchRequest]
+ for(topic <- topics)
+ fetches += new FetchRequest(topic, -1, 0, 10000)
+
+ try {
+ val responses = consumer.multifetch(fetches: _*)
+ for(resp <- responses)
+ resp.iterator
+ fail("expect exception")
+ }
+ catch {
+ case e: InvalidPartitionException => "this is good"
+ }
+ }
+
+ // restore set request handler logger to a higher level
+ requestHandlerLogger.setLevel(Level.ERROR)
+ }
+
+ def testMultiProduce() {
+ // send some messages
+ val topics = List("test1", "test2", "test3");
+ val messages = new mutable.HashMap[String, ByteBufferMessageSet]
+ val fetches = new mutable.ArrayBuffer[FetchRequest]
+ var produceList: List[ProducerRequest] = Nil
+ for(topic <- topics) {
+ val set = new ByteBufferMessageSet(NoCompressionCodec,
+ new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
+ messages += topic -> set
+ produceList ::= new ProducerRequest(topic, 0, set)
+ fetches += new FetchRequest(topic, 0, 0, 10000)
+ }
+ producer.multiSend(produceList.toArray)
+
+ for (messageSet <- messages.values)
+ messageSet.getBuffer.rewind
+
+ // wait a bit for produced message to be available
+ Thread.sleep(200)
+ val response = consumer.multifetch(fetches: _*)
+ for((topic, resp) <- topics.zip(response.toList))
+ TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
+ }
+
+ def testMultiProduceWithCompression() {
+ // send some messages
+ val topics = List("test1", "test2", "test3");
+ val messages = new mutable.HashMap[String, ByteBufferMessageSet]
+ val fetches = new mutable.ArrayBuffer[FetchRequest]
+ var produceList: List[ProducerRequest] = Nil
+ for(topic <- topics) {
+ val set = new ByteBufferMessageSet(DefaultCompressionCodec,
+ new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
+ messages += topic -> set
+ produceList ::= new ProducerRequest(topic, 0, set)
+ fetches += new FetchRequest(topic, 0, 0, 10000)
+ }
+ producer.multiSend(produceList.toArray)
+
+ for (messageSet <- messages.values)
+ messageSet.getBuffer.rewind
+
+ // wait a bit for produced message to be available
+ Thread.sleep(200)
+ val response = consumer.multifetch(fetches: _*)
+ for((topic, resp) <- topics.zip(response.toList))
+ TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
+ }
+}
Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.integration
+
+import kafka.consumer.SimpleConsumer
+import org.scalatest.junit.JUnit3Suite
+import java.util.Properties
+import kafka.producer.{SyncProducerConfig, SyncProducer}
+
+trait ProducerConsumerTestHarness extends JUnit3Suite {
+
+ val port: Int
+ val host = "localhost"
+ var producer: SyncProducer = null
+ var consumer: SimpleConsumer = null
+
+ override def setUp() {
+ val props = new Properties()
+ props.put("host", host)
+ props.put("port", port.toString)
+ props.put("buffer.size", "65536")
+ props.put("connect.timeout.ms", "100000")
+ props.put("reconnect.interval", "10000")
+ producer = new SyncProducer(new SyncProducerConfig(props))
+ consumer = new SimpleConsumer(host,
+ port,
+ 1000000,
+ 64*1024)
+ super.setUp
+ }
+
+ override def tearDown() {
+ super.tearDown
+ producer.close()
+ consumer.close()
+ }
+}
Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala Mon Aug 1 23:41:24 2011
@@ -0,0 +1,287 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.javaapi.consumer
+
+import junit.framework.Assert._
+import kafka.zk.ZooKeeperTestHarness
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import scala.collection._
+import kafka.utils.Utils
+import kafka.utils.{TestZKUtils, TestUtils}
+import org.scalatest.junit.JUnit3Suite
+import scala.collection.JavaConversions._
+import kafka.javaapi.message.ByteBufferMessageSet
+import kafka.consumer.{Consumer, ConsumerConfig, KafkaMessageStream, ConsumerTimeoutException}
+import javax.management.NotCompliantMBeanException
+import org.apache.log4j.{Level, Logger}
+import kafka.message.{NoCompressionCodec, DefaultCompressionCodec, CompressionCodec, Message}
+
+class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with ZooKeeperTestHarness {
+ private val logger = Logger.getLogger(getClass())
+
+ val zookeeperConnect = TestZKUtils.zookeeperConnect
+ val zkConnect = zookeeperConnect
+ val numNodes = 2
+ val numParts = 2
+ val topic = "topic1"
+ val configs =
+ for(props <- TestUtils.createBrokerConfigs(numNodes))
+ yield new KafkaConfig(props) {
+ override val enableZookeeper = true
+ override val numPartitions = numParts
+ override val zkConnect = zookeeperConnect
+ }
+ val group = "group1"
+ val consumer0 = "consumer0"
+ val consumer1 = "consumer1"
+ val consumer2 = "consumer2"
+ val consumer3 = "consumer3"
+ val nMessages = 2
+
+ def testBasic() {
+ val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandlers])
+ requestHandlerLogger.setLevel(Level.FATAL)
+ var actualMessages: List[Message] = Nil
+
+ // test consumer timeout logic
+ val consumerConfig0 = new ConsumerConfig(
+ TestUtils.createConsumerProperties(zkConnect, group, consumer0)) {
+ override val consumerTimeoutMs = 200
+ }
+ val zkConsumerConnector0 = new ZookeeperConsumerConnector(consumerConfig0, true)
+ val topicMessageStreams0 = zkConsumerConnector0.createMessageStreams(toJavaMap(Predef.Map(topic -> numNodes*numParts/2)))
+ try {
+ getMessages(nMessages*2, topicMessageStreams0)
+ fail("should get an exception")
+ }
+ catch {
+ case e: ConsumerTimeoutException => // this is ok
+ case e => throw e
+ }
+ zkConsumerConnector0.shutdown
+
+ // send some messages to each broker
+ val sentMessages1 = sendMessages(nMessages, "batch1")
+ // create a consumer
+ val consumerConfig1 = new ConsumerConfig(
+ TestUtils.createConsumerProperties(zkConnect, group, consumer1))
+ val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
+ val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(toJavaMap(Predef.Map(topic -> numNodes*numParts/2)))
+ val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1)
+ assertEquals(sentMessages1, receivedMessages1)
+ // commit consumed offsets
+ zkConsumerConnector1.commitOffsets
+
+ // create a consumer
+ val consumerConfig2 = new ConsumerConfig(
+ TestUtils.createConsumerProperties(zkConnect, group, consumer2))
+ val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true)
+ val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(toJavaMap(Predef.Map(topic -> numNodes*numParts/2)))
+ // send some messages to each broker
+ val sentMessages2 = sendMessages(nMessages, "batch2")
+ Thread.sleep(200)
+ val receivedMessages2_1 = getMessages(nMessages, topicMessageStreams1)
+ val receivedMessages2_2 = getMessages(nMessages, topicMessageStreams2)
+ val receivedMessages2 = (receivedMessages2_1 ::: receivedMessages2_2).sortWith((s,t) => s.checksum < t.checksum)
+ assertEquals(sentMessages2, receivedMessages2)
+
+ // create a consumer with empty map
+ val consumerConfig3 = new ConsumerConfig(
+ TestUtils.createConsumerProperties(zkConnect, group, consumer3))
+ val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true)
+ val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(toJavaMap(new mutable.HashMap[String, Int]()))
+ // send some messages to each broker
+ Thread.sleep(200)
+ val sentMessages3 = sendMessages(nMessages, "batch3")
+ Thread.sleep(200)
+ val receivedMessages3_1 = getMessages(nMessages, topicMessageStreams1)
+ val receivedMessages3_2 = getMessages(nMessages, topicMessageStreams2)
+ val receivedMessages3 = (receivedMessages3_1 ::: receivedMessages3_2).sortWith((s,t) => s.checksum < t.checksum)
+ assertEquals(sentMessages3, receivedMessages3)
+
+ zkConsumerConnector1.shutdown
+ zkConsumerConnector2.shutdown
+ zkConsumerConnector3.shutdown
+ logger.info("all consumer connectors stopped")
+ requestHandlerLogger.setLevel(Level.ERROR)
+ }
+
+ def testCompression() {
+ val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandlers])
+ requestHandlerLogger.setLevel(Level.FATAL)
+ var actualMessages: List[Message] = Nil
+
+ // test consumer timeout logic
+ val consumerConfig0 = new ConsumerConfig(
+ TestUtils.createConsumerProperties(zkConnect, group, consumer0)) {
+ override val consumerTimeoutMs = 200
+ }
+ val zkConsumerConnector0 = new ZookeeperConsumerConnector(consumerConfig0, true)
+ val topicMessageStreams0 = zkConsumerConnector0.createMessageStreams(toJavaMap(Predef.Map(topic -> numNodes*numParts/2)))
+ try {
+ getMessages(nMessages*2, topicMessageStreams0)
+ fail("should get an exception")
+ }
+ catch {
+ case e: ConsumerTimeoutException => // this is ok
+ case e => throw e
+ }
+ zkConsumerConnector0.shutdown
+
+ // send some messages to each broker
+ val sentMessages1 = sendMessages(nMessages, "batch1", DefaultCompressionCodec)
+ // create a consumer
+ val consumerConfig1 = new ConsumerConfig(
+ TestUtils.createConsumerProperties(zkConnect, group, consumer1))
+ val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
+ val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(toJavaMap(Predef.Map(topic -> numNodes*numParts/2)))
+ val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1)
+ assertEquals(sentMessages1, receivedMessages1)
+ // commit consumed offsets
+ zkConsumerConnector1.commitOffsets
+
+ // create a consumer
+ val consumerConfig2 = new ConsumerConfig(
+ TestUtils.createConsumerProperties(zkConnect, group, consumer2))
+ val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true)
+ val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(toJavaMap(Predef.Map(topic -> numNodes*numParts/2)))
+ // send some messages to each broker
+ val sentMessages2 = sendMessages(nMessages, "batch2", DefaultCompressionCodec)
+ Thread.sleep(200)
+ val receivedMessages2_1 = getMessages(nMessages, topicMessageStreams1)
+ val receivedMessages2_2 = getMessages(nMessages, topicMessageStreams2)
+ val receivedMessages2 = (receivedMessages2_1 ::: receivedMessages2_2).sortWith((s,t) => s.checksum < t.checksum)
+ assertEquals(sentMessages2, receivedMessages2)
+
+ // create a consumer with empty map
+ val consumerConfig3 = new ConsumerConfig(
+ TestUtils.createConsumerProperties(zkConnect, group, consumer3))
+ val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true)
+ val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(toJavaMap(new mutable.HashMap[String, Int]()))
+ // send some messages to each broker
+ Thread.sleep(200)
+ val sentMessages3 = sendMessages(nMessages, "batch3", DefaultCompressionCodec)
+ Thread.sleep(200)
+ val receivedMessages3_1 = getMessages(nMessages, topicMessageStreams1)
+ val receivedMessages3_2 = getMessages(nMessages, topicMessageStreams2)
+ val receivedMessages3 = (receivedMessages3_1 ::: receivedMessages3_2).sortWith((s,t) => s.checksum < t.checksum)
+ assertEquals(sentMessages3, receivedMessages3)
+
+ zkConsumerConnector1.shutdown
+ zkConsumerConnector2.shutdown
+ zkConsumerConnector3.shutdown
+ logger.info("all consumer connectors stopped")
+ requestHandlerLogger.setLevel(Level.ERROR)
+ }
+
+ def testCompressionSetConsumption() {
+ val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandlers])
+ requestHandlerLogger.setLevel(Level.FATAL)
+
+ var actualMessages: List[Message] = Nil
+
+ // shutdown one server
+ servers.last.shutdown
+ Thread.sleep(500)
+
+ // send some messages to each broker
+ val sentMessages = sendMessages(configs.head, 200, "batch1", DefaultCompressionCodec)
+ // test consumer timeout logic
+ val consumerConfig0 = new ConsumerConfig(
+ TestUtils.createConsumerProperties(zkConnect, group, consumer0)) {
+ override val consumerTimeoutMs = 5000
+ }
+ val zkConsumerConnector0 = new ZookeeperConsumerConnector(consumerConfig0, true)
+ val topicMessageStreams0 = zkConsumerConnector0.createMessageStreams(toJavaMap(Predef.Map(topic -> 1)))
+ getMessages(100, topicMessageStreams0)
+ zkConsumerConnector0.shutdown
+ // at this point, only some part of the message set was consumed. So consumed offset should still be 0
+ // also fetched offset should be 0
+ val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig0, true)
+ val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(toJavaMap(Predef.Map(topic -> 1)))
+ val receivedMessages = getMessages(400, topicMessageStreams1)
+ val sortedReceivedMessages = receivedMessages.sortWith((s,t) => s.checksum < t.checksum)
+ val sortedSentMessages = sentMessages.sortWith((s,t) => s.checksum < t.checksum)
+ assertEquals(sortedSentMessages, sortedReceivedMessages)
+ zkConsumerConnector1.shutdown
+
+ requestHandlerLogger.setLevel(Level.ERROR)
+ }
+
+ def sendMessages(conf: KafkaConfig, messagesPerNode: Int, header: String, compressed: CompressionCodec): List[Message]= {
+ var messages: List[Message] = Nil
+ val producer = kafka.javaapi.Implicits.toJavaSyncProducer(TestUtils.createProducer("localhost", conf.port))
+ for (partition <- 0 until numParts) {
+ val ms = 0.until(messagesPerNode).map(x =>
+ new Message((header + conf.brokerId + "-" + partition + "-" + x).getBytes)).toArray
+ val mSet = new ByteBufferMessageSet(compressionCodec = compressed, messages = getMessageList(ms: _*))
+ for (message <- ms)
+ messages ::= message
+ producer.send(topic, partition, mSet)
+ }
+ producer.close()
+ messages
+ }
+
+ def sendMessages(messagesPerNode: Int, header: String, compressed: CompressionCodec = NoCompressionCodec): List[Message]= {
+ var messages: List[Message] = Nil
+ for(conf <- configs) {
+ messages ++= sendMessages(conf, messagesPerNode, header, compressed)
+ }
+ messages.sortWith((s,t) => s.checksum < t.checksum)
+ }
+
+ def getMessages(nMessagesPerThread: Int, jTopicMessageStreams: java.util.Map[String, java.util.List[KafkaMessageStream]])
+ : List[Message]= {
+ var messages: List[Message] = Nil
+ val topicMessageStreams = asMap(jTopicMessageStreams)
+ for ((topic, messageStreams) <- topicMessageStreams) {
+ for (messageStream <- messageStreams) {
+ val iterator = messageStream.iterator
+ for (i <- 0 until nMessagesPerThread) {
+ assertTrue(iterator.hasNext)
+ val message = iterator.next
+ messages ::= message
+ logger.debug("received message: " + Utils.toString(message.payload, "UTF-8"))
+ }
+ }
+ }
+ messages.sortWith((s,t) => s.checksum < t.checksum)
+ }
+
+ def testJMX() {
+ val consumerConfig = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer0))
+ try {
+ val consumer = Consumer.createJavaConsumerConnector(consumerConfig)
+ }catch {
+ case e: NotCompliantMBeanException => fail("Should not fail with NotCompliantMBeanException")
+ }
+ }
+
+ private def getMessageList(messages: Message*): java.util.List[Message] = {
+ val messageList = new java.util.ArrayList[Message]()
+ messages.foreach(m => messageList.add(m))
+ messageList
+ }
+
+ private def toJavaMap(scalaMap: Map[String, Int]): java.util.Map[String, java.lang.Integer] = {
+ val javaMap = new java.util.HashMap[String, java.lang.Integer]()
+ scalaMap.foreach(m => javaMap.put(m._1, m._2.asInstanceOf[java.lang.Integer]))
+ javaMap
+ }
+}