You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2011/08/02 01:42:17 UTC

svn commit: r1152970 [21/26] - in /incubator/kafka: branches/ site/ trunk/ trunk/bin/ trunk/clients/ trunk/clients/clojure/ trunk/clients/clojure/leiningen/ trunk/clients/clojure/resources/ trunk/clients/clojure/src/ trunk/clients/clojure/src/kafka/ tr...

Added: incubator/kafka/trunk/core/src/main/scala/kafka/utils/package.html
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/utils/package.html?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/utils/package.html (added)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/utils/package.html Mon Aug  1 23:41:24 2011
@@ -0,0 +1 @@
+Utility functions.
\ No newline at end of file

Added: incubator/kafka/trunk/core/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/resources/log4j.properties?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/resources/log4j.properties (added)
+++ incubator/kafka/trunk/core/src/test/resources/log4j.properties Mon Aug  1 23:41:24 2011
@@ -0,0 +1,9 @@
+log4j.rootLogger=OFF, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+# Turn on all our debugging info
+log4j.logger.kafka=OFF
+

Added: incubator/kafka/trunk/core/src/test/resources/test-kafka-logs/MagicByte0-0/00000000000000000000.kafka
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/resources/test-kafka-logs/MagicByte0-0/00000000000000000000.kafka?rev=1152970&view=auto
==============================================================================
Binary file - no diff available.

Propchange: incubator/kafka/trunk/core/src/test/resources/test-kafka-logs/MagicByte0-0/00000000000000000000.kafka
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: incubator/kafka/trunk/core/src/test/scala/other/kafka.log4j.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/other/kafka.log4j.properties?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/other/kafka.log4j.properties (added)
+++ incubator/kafka/trunk/core/src/test/scala/other/kafka.log4j.properties Mon Aug  1 23:41:24 2011
@@ -0,0 +1,8 @@
+log4j.rootLogger=INFO, KAFKA
+
+log4j.appender.KAFKA=kafka.log4j.KafkaAppender
+
+log4j.appender.KAFKA.Port=9092
+log4j.appender.KAFKA.Host=localhost
+log4j.appender.KAFKA.Topic=test-logger
+log4j.appender.KAFKA.Serializer=kafka.AppenderStringSerializer

Added: incubator/kafka/trunk/core/src/test/scala/other/kafka/DeleteZKPath.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/other/kafka/DeleteZKPath.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/other/kafka/DeleteZKPath.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/other/kafka/DeleteZKPath.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,44 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka
+
+import consumer.ConsumerConfig
+import utils.{StringSerializer, ZkUtils, Utils}
+import org.I0Itec.zkclient.ZkClient
+
+object DeleteZKPath {
+  def main(args: Array[String]) {
+    if(args.length < 2) {
+      println("USAGE: " + DeleteZKPath.getClass.getName + " consumer.properties zk_path")
+      System.exit(1)
+    }
+
+    val config = new ConsumerConfig(Utils.loadProps(args(0)))
+    val zkPath = args(1)
+
+    val zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs,
+      StringSerializer)
+
+    try {
+      ZkUtils.deletePathRecursive(zkClient, zkPath);
+      System.out.println(zkPath + " is deleted")
+    } catch {
+      case e: Exception => System.err.println("Path not deleted " + e.printStackTrace())
+    }
+    
+  }
+}

Added: incubator/kafka/trunk/core/src/test/scala/other/kafka/TestKafkaAppender.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/other/kafka/TestKafkaAppender.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/other/kafka/TestKafkaAppender.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/other/kafka/TestKafkaAppender.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka
+
+import message.Message
+import org.apache.log4j.{Logger, PropertyConfigurator}
+import serializer.Encoder
+
+object TestKafkaAppender {
+
+  private val logger = Logger.getLogger(TestKafkaAppender.getClass)
+  
+  def main(args:Array[String]) {
+    
+    if(args.length < 1) {
+      println("USAGE: " + TestKafkaAppender.getClass.getName + " log4j_config")
+      System.exit(1)
+    }
+
+    try {
+      PropertyConfigurator.configure(args(0))
+    } catch {
+      case e: Exception => System.err.println("KafkaAppender could not be initialized ! Exiting..")
+      e.printStackTrace()
+      System.exit(1)
+    }
+
+    for(i <- 1 to 10)
+      logger.info("test")    
+  }
+}
+
+class AppenderStringSerializer extends Encoder[AnyRef] {
+  def toMessage(event: AnyRef):Message = new Message(event.asInstanceOf[String].getBytes)
+}
+

Added: incubator/kafka/trunk/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka
+
+import java.net.URI
+import java.util.Arrays.asList
+import java.io._
+import java.nio._
+import java.nio.channels._
+import joptsimple._
+
+object TestLinearWriteSpeed {
+
+  def main(args: Array[String]): Unit = {
+    val parser = new OptionParser
+    val bytesOpt = parser.accepts("bytes", "REQUIRED: The number of bytes to write.")
+                           .withRequiredArg
+                           .describedAs("num_bytes")
+                           .ofType(classOf[java.lang.Integer])
+    val sizeOpt = parser.accepts("size", "REQUIRED: The size of each write.")
+                           .withRequiredArg
+                           .describedAs("num_bytes")
+                           .ofType(classOf[java.lang.Integer])
+    val filesOpt = parser.accepts("files", "REQUIRED: The number of files.")
+                           .withRequiredArg
+                           .describedAs("num_files")
+                           .ofType(classOf[java.lang.Integer])
+                           .defaultsTo(1)
+    
+    val options = parser.parse(args : _*)
+    
+    for(arg <- List(bytesOpt, sizeOpt, filesOpt)) {
+      if(!options.has(arg)) {
+        System.err.println("Missing required argument \"" + arg + "\"") 
+        parser.printHelpOn(System.err)
+        System.exit(1)
+      }
+    }
+
+    val bytesToWrite = options.valueOf(bytesOpt).intValue
+    val bufferSize = options.valueOf(sizeOpt).intValue
+    val numFiles = options.valueOf(filesOpt).intValue
+    val buffer = ByteBuffer.allocate(bufferSize)
+    while(buffer.hasRemaining)
+      buffer.put(123.asInstanceOf[Byte])
+    
+    val channels = new Array[FileChannel](numFiles)
+    for(i <- 0 until numFiles) {
+      val file = File.createTempFile("kafka-test", ".dat")
+      file.deleteOnExit()
+      channels(i) = new RandomAccessFile(file, "rw").getChannel()
+    }
+    
+    val begin = System.currentTimeMillis
+    for(i <- 0 until bytesToWrite / bufferSize) {
+      buffer.rewind()
+      channels(i % numFiles).write(buffer)
+    }
+    val ellapsedSecs = (System.currentTimeMillis - begin) / 1000.0
+    System.out.println(bytesToWrite / (1024 * 1024 * ellapsedSecs) + " MB per sec")
+  }
+  
+}

Added: incubator/kafka/trunk/core/src/test/scala/other/kafka/TestLogPerformance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/other/kafka/TestLogPerformance.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/other/kafka/TestLogPerformance.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/other/kafka/TestLogPerformance.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import kafka.log._
+import kafka.message._
+import kafka.utils.{TestUtils, Utils}
+
+object TestLogPerformance {
+
+  def main(args: Array[String]): Unit = {
+    if(args.length < 4)
+      Utils.croak("USAGE: java " + getClass().getName() + " num_messages message_size batch_size compression_codec")
+    val numMessages = args(0).toInt
+    val messageSize = args(1).toInt
+    val batchSize = args(2).toInt
+    val compressionCodec = CompressionCodec.getCompressionCodec(args(3).toInt)
+    val dir = TestUtils.tempDir()
+    val log = new Log(dir, 50*1024*1024, 5000000, false)
+    val bytes = new Array[Byte](messageSize)
+    new java.util.Random().nextBytes(bytes)
+    val message = new Message(bytes)
+    val messages = new Array[Message](batchSize)
+    for(i <- 0 until batchSize)
+      messages(i) = message
+    val messageSet = new ByteBufferMessageSet(compressionCodec = compressionCodec, messages = messages: _*)
+    val numBatches = numMessages / batchSize
+    val start = System.currentTimeMillis()
+    for(i <- 0 until numBatches)
+      log.append(messageSet)
+    log.close()
+    val ellapsed = (System.currentTimeMillis() - start) / 1000.0
+    val writtenBytes = MessageSet.entrySize(message) * numMessages
+    println("message size = " + MessageSet.entrySize(message))
+    println("MB/sec: " + writtenBytes / ellapsed / (1024.0 * 1024.0))
+    Utils.rm(dir)
+  }
+  
+}

Added: incubator/kafka/trunk/core/src/test/scala/other/kafka/TestTruncate.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/other/kafka/TestTruncate.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/other/kafka/TestTruncate.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/other/kafka/TestTruncate.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka
+
+import java.io._
+import java.nio._
+
+/* This code tests the correct function of java's FileChannel.truncate--some platforms don't work. */
+object TestTruncate {
+
+  def main(args: Array[String]): Unit = {
+    val name = File.createTempFile("kafka", ".test")
+    name.deleteOnExit()
+    val file = new RandomAccessFile(name, "rw").getChannel()
+    val buffer = ByteBuffer.allocate(12)
+    buffer.putInt(4).putInt(4).putInt(4)
+    buffer.rewind()
+    file.write(buffer)
+    println("position prior to truncate: " + file.position)
+    file.truncate(4)
+    println("position after truncate to 4: " + file.position)
+  }
+  
+}

Added: incubator/kafka/trunk/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka
+
+import consumer._
+import utils.Utils
+import java.util.concurrent.CountDownLatch
+
+object TestZKConsumerOffsets {
+  def main(args: Array[String]): Unit = {
+    if(args.length < 1) {
+      println("USAGE: " + TestZKConsumerOffsets.getClass.getName + " consumer.properties topic latest")
+      System.exit(1)
+    }
+    println("Starting consumer...")
+    val topic = args(1)
+    val autoOffsetReset = args(2)    
+    val props = Utils.loadProps(args(0))
+    props.put("autooffset.reset", "largest")
+    
+    val config = new ConsumerConfig(props)
+    val consumerConnector: ConsumerConnector = Consumer.create(config)
+    val topicMessageStreams = consumerConnector.createMessageStreams(Predef.Map(topic -> 1))
+    var threadList = List[ConsumerThread]()
+    for ((topic, streamList) <- topicMessageStreams)
+      for (stream <- streamList)
+        threadList ::= new ConsumerThread(stream)
+
+    for (thread <- threadList)
+      thread.start
+
+    // attach shutdown handler to catch control-c
+    Runtime.getRuntime().addShutdownHook(new Thread() {
+      override def run() = {
+        consumerConnector.shutdown
+        threadList.foreach(_.shutdown)
+        println("consumer threads shutted down")
+      }
+    })
+  }
+}
+
+private class ConsumerThread(stream: KafkaMessageStream) extends Thread {
+  val shutdownLatch = new CountDownLatch(1)
+
+  override def run() {
+    println("Starting consumer thread..")
+    for (message <- stream) {
+      println("consumed: " + Utils.toString(message.payload, "UTF-8"))
+    }
+    shutdownLatch.countDown
+    println("thread shutdown !" )
+  }
+
+  def shutdown() {
+    shutdownLatch.await
+  }
+}

Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/consumer/TopicCountTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/consumer/TopicCountTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/consumer/TopicCountTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/consumer/TopicCountTest.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+import junit.framework.Assert._
+import org.junit.Test
+import org.scalatest.junit.JUnitSuite
+import kafka.cluster.Partition
+
+
+class TopicCountTest extends JUnitSuite {
+
+  @Test
+  def testBasic() {
+    val consumer = "conusmer1"
+    val json = """{ "topic1" : 2, "topic2" : 3 }"""
+    val topicCount = TopicCount.constructTopicCount(consumer, json)
+    val topicCountMap = Map(
+      "topic1" -> 2,
+      "topic2" -> 3
+      )
+    val expectedTopicCount = new TopicCount(consumer, topicCountMap)
+    assertTrue(expectedTopicCount == topicCount)
+
+    val topicCount2 = TopicCount.constructTopicCount(consumer, expectedTopicCount.toJsonString)
+    assertTrue(expectedTopicCount == topicCount2)
+  }
+
+  @Test
+  def testPartition() {
+    assertTrue(new Partition(10, 0) == new Partition(10, 0))
+    assertTrue(new Partition(10, 1) != new Partition(10, 0))
+  }
+}

Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,267 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+import junit.framework.Assert._
+import kafka.zk.ZooKeeperTestHarness
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import scala.collection._
+import kafka.utils.Utils
+import kafka.utils.{TestZKUtils, TestUtils}
+import org.scalatest.junit.JUnit3Suite
+import org.apache.log4j.{Level, Logger}
+import kafka.message._
+
+class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with ZooKeeperTestHarness {
+  private val logger = Logger.getLogger(getClass())
+
+  val zookeeperConnect = TestZKUtils.zookeeperConnect
+  val zkConnect = zookeeperConnect
+  val numNodes = 2
+  val numParts = 2
+  val topic = "topic1"
+  val configs =
+    for(props <- TestUtils.createBrokerConfigs(numNodes))
+    yield new KafkaConfig(props) {
+      override val enableZookeeper = true
+      override val numPartitions = numParts
+      override val zkConnect = zookeeperConnect
+    }
+  val group = "group1"
+  val consumer0 = "consumer0"
+  val consumer1 = "consumer1"
+  val consumer2 = "consumer2"
+  val consumer3 = "consumer3"
+  val nMessages = 2
+
+  def testBasic() {
+    val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandlers])
+    requestHandlerLogger.setLevel(Level.FATAL)
+
+    var actualMessages: List[Message] = Nil
+
+    // test consumer timeout logic
+    val consumerConfig0 = new ConsumerConfig(
+      TestUtils.createConsumerProperties(zkConnect, group, consumer0)) {
+      override val consumerTimeoutMs = 200
+    }
+    val zkConsumerConnector0 = new ZookeeperConsumerConnector(consumerConfig0, true)
+    val topicMessageStreams0 = zkConsumerConnector0.createMessageStreams(Predef.Map(topic -> numNodes*numParts/2))
+    try {
+      getMessages(nMessages*2, topicMessageStreams0)
+      fail("should get an exception")
+    }
+    catch {
+      case e: ConsumerTimeoutException => // this is ok
+        println("This is ok")
+      case e => throw e
+    }
+    zkConsumerConnector0.shutdown
+
+    // send some messages to each broker
+    val sentMessages1 = sendMessages(nMessages, "batch1")
+    // create a consumer
+    val consumerConfig1 = new ConsumerConfig(
+      TestUtils.createConsumerProperties(zkConnect, group, consumer1))
+    val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
+    val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic -> numNodes*numParts/2))
+    val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1)
+    assertEquals(sentMessages1, receivedMessages1)
+    // commit consumed offsets
+    zkConsumerConnector1.commitOffsets
+
+    // create a consumer
+    val consumerConfig2 = new ConsumerConfig(
+      TestUtils.createConsumerProperties(zkConnect, group, consumer2))
+    val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true)
+    val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Predef.Map(topic -> numNodes*numParts/2))
+    // send some messages to each broker
+    val sentMessages2 = sendMessages(nMessages, "batch2")
+    Thread.sleep(200)
+    val receivedMessages2_1 = getMessages(nMessages, topicMessageStreams1)
+    val receivedMessages2_2 = getMessages(nMessages, topicMessageStreams2)
+    val receivedMessages2 = (receivedMessages2_1 ::: receivedMessages2_2).sortWith((s,t) => s.checksum < t.checksum)
+    assertEquals(sentMessages2, receivedMessages2)
+
+    // create a consumer with empty map
+    val consumerConfig3 = new ConsumerConfig(
+      TestUtils.createConsumerProperties(zkConnect, group, consumer3))
+    val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true)
+    val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int]())
+    // send some messages to each broker
+    Thread.sleep(200)
+    val sentMessages3 = sendMessages(nMessages, "batch3")
+    Thread.sleep(200)
+    val receivedMessages3_1 = getMessages(nMessages, topicMessageStreams1)
+    val receivedMessages3_2 = getMessages(nMessages, topicMessageStreams2)
+    val receivedMessages3 = (receivedMessages3_1 ::: receivedMessages3_2).sortWith((s,t) => s.checksum < t.checksum)
+    assertEquals(sentMessages3, receivedMessages3)
+
+    zkConsumerConnector1.shutdown
+    zkConsumerConnector2.shutdown
+    zkConsumerConnector3.shutdown
+    logger.info("all consumer connectors stopped")
+    requestHandlerLogger.setLevel(Level.ERROR)
+  }
+
+  def testCompression() {
+    val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandlers])
+    requestHandlerLogger.setLevel(Level.FATAL)
+
+    var actualMessages: List[Message] = Nil
+
+    // test consumer timeout logic
+    val consumerConfig0 = new ConsumerConfig(
+      TestUtils.createConsumerProperties(zkConnect, group, consumer0)) {
+      override val consumerTimeoutMs = 200
+    }
+    val zkConsumerConnector0 = new ZookeeperConsumerConnector(consumerConfig0, true)
+    val topicMessageStreams0 = zkConsumerConnector0.createMessageStreams(Predef.Map(topic -> numNodes*numParts/2))
+    try {
+      getMessages(nMessages*2, topicMessageStreams0)
+      fail("should get an exception")
+    }
+    catch {
+      case e: ConsumerTimeoutException => // this is ok
+        println("This is ok")
+      case e => throw e
+    }
+    zkConsumerConnector0.shutdown
+
+    println("Sending messages for 1st consumer")
+    // send some messages to each broker
+    val sentMessages1 = sendMessages(nMessages, "batch1", DefaultCompressionCodec)
+    // create a consumer
+    val consumerConfig1 = new ConsumerConfig(
+      TestUtils.createConsumerProperties(zkConnect, group, consumer1))
+    val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
+    val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic -> numNodes*numParts/2))
+    val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1)
+    assertEquals(sentMessages1, receivedMessages1)
+    // commit consumed offsets
+    zkConsumerConnector1.commitOffsets
+
+    println("Sending more messages for 2nd consumer")
+    // create a consumer
+    val consumerConfig2 = new ConsumerConfig(
+      TestUtils.createConsumerProperties(zkConnect, group, consumer2))
+    val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true)
+    val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(Predef.Map(topic -> numNodes*numParts/2))
+    // send some messages to each broker
+    val sentMessages2 = sendMessages(nMessages, "batch2", DefaultCompressionCodec)
+    Thread.sleep(200)
+    val receivedMessages2_1 = getMessages(nMessages, topicMessageStreams1)
+    val receivedMessages2_2 = getMessages(nMessages, topicMessageStreams2)
+    val receivedMessages2 = (receivedMessages2_1 ::: receivedMessages2_2).sortWith((s,t) => s.checksum < t.checksum)
+    assertEquals(sentMessages2, receivedMessages2)
+
+    // create a consumer with empty map
+    println("Sending more messages for 3rd consumer")
+    val consumerConfig3 = new ConsumerConfig(
+      TestUtils.createConsumerProperties(zkConnect, group, consumer3))
+    val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true)
+    val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(new mutable.HashMap[String, Int]())
+    // send some messages to each broker
+    Thread.sleep(200)
+    val sentMessages3 = sendMessages(nMessages, "batch3", DefaultCompressionCodec)
+    Thread.sleep(200)
+    val receivedMessages3_1 = getMessages(nMessages, topicMessageStreams1)
+    val receivedMessages3_2 = getMessages(nMessages, topicMessageStreams2)
+    val receivedMessages3 = (receivedMessages3_1 ::: receivedMessages3_2).sortWith((s,t) => s.checksum < t.checksum)
+    assertEquals(sentMessages3, receivedMessages3)
+
+    zkConsumerConnector1.shutdown
+    zkConsumerConnector2.shutdown
+    zkConsumerConnector3.shutdown
+    logger.info("all consumer connectors stopped")
+    requestHandlerLogger.setLevel(Level.ERROR)
+  }
+
+  def testCompressionSetConsumption() {
+    val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandlers])
+    requestHandlerLogger.setLevel(Level.FATAL)
+
+    var actualMessages: List[Message] = Nil
+
+    // shutdown one server
+    servers.last.shutdown
+    Thread.sleep(500)
+
+    // send some messages to each broker
+    val sentMessages = sendMessages(configs.head, 200, "batch1", DefaultCompressionCodec)
+    // test consumer timeout logic
+    val consumerConfig0 = new ConsumerConfig(
+      TestUtils.createConsumerProperties(zkConnect, group, consumer0)) {
+      override val consumerTimeoutMs = 5000
+    }
+    val zkConsumerConnector0 = new ZookeeperConsumerConnector(consumerConfig0, true)
+    val topicMessageStreams0 = zkConsumerConnector0.createMessageStreams(Predef.Map(topic -> 1))
+    getMessages(100, topicMessageStreams0)
+    zkConsumerConnector0.shutdown
+    // at this point, only some part of the message set was consumed. So consumed offset should still be 0
+    // also fetched offset should be 0
+    val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig0, true)
+    val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic -> 1))
+    val receivedMessages = getMessages(400, topicMessageStreams1)
+    val sortedReceivedMessages = receivedMessages.sortWith((s,t) => s.checksum < t.checksum)
+    val sortedSentMessages = sentMessages.sortWith((s,t) => s.checksum < t.checksum)
+    assertEquals(sortedSentMessages, sortedReceivedMessages)
+    zkConsumerConnector1.shutdown
+
+    requestHandlerLogger.setLevel(Level.ERROR)
+  }
+
+  def sendMessages(conf: KafkaConfig, messagesPerNode: Int, header: String, compression: CompressionCodec): List[Message]= {
+    var messages: List[Message] = Nil
+    val producer = TestUtils.createProducer("localhost", conf.port)
+    for (partition <- 0 until numParts) {
+      val ms = 0.until(messagesPerNode).map(x =>
+        new Message((header + conf.brokerId + "-" + partition + "-" + x).getBytes)).toArray
+      val mSet = new ByteBufferMessageSet(compressionCodec = compression, messages = ms: _*)
+      for (message <- ms)
+        messages ::= message
+      producer.send(topic, partition, mSet)
+    }
+    producer.close()
+    messages
+  }
+
+  def sendMessages(messagesPerNode: Int, header: String, compression: CompressionCodec = NoCompressionCodec): List[Message]= {
+    var messages: List[Message] = Nil
+    for(conf <- configs) {
+      messages ++= sendMessages(conf, messagesPerNode, header, compression)
+    }
+    messages.sortWith((s,t) => s.checksum < t.checksum)
+  }
+
+  def getMessages(nMessagesPerThread: Int, topicMessageStreams: Map[String,List[KafkaMessageStream]]): List[Message]= {
+    var messages: List[Message] = Nil
+    for ((topic, messageStreams) <- topicMessageStreams) {
+      for (messageStream <- messageStreams) {
+        val iterator = messageStream.iterator
+        for (i <- 0 until nMessagesPerThread) {
+          assertTrue(iterator.hasNext)
+          val message = iterator.next
+          messages ::= message
+          logger.debug("received message: " + Utils.toString(message.payload, "UTF-8"))
+        }
+      }
+    }
+    messages.sortWith((s,t) => s.checksum < t.checksum)
+  }
+}

Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,223 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.integration
+
+import junit.framework.Assert._
+import kafka.zk.ZooKeeperTestHarness
+import java.nio.channels.ClosedByInterruptException
+import java.util.concurrent.atomic.AtomicInteger
+import kafka.utils.ZKGroupTopicDirs
+import kafka.consumer.{ConsumerTimeoutException, ConsumerConfig, ConsumerConnector, Consumer}
+import kafka.server.{KafkaRequestHandlers, KafkaServer, KafkaConfig}
+import org.apache.log4j.{Level, Logger}
+import org.scalatest.junit.JUnit3Suite
+import kafka.utils.{TestUtils, TestZKUtils}
+
+class AutoOffsetResetTest extends JUnit3Suite with ZooKeeperTestHarness {
+
+  val zkConnect = TestZKUtils.zookeeperConnect
+  val topic = "test_topic"
+  val group = "default_group"
+  val testConsumer = "consumer"
+  val brokerPort = 9892
+  val kafkaConfig = new KafkaConfig(TestUtils.createBrokerConfig(0, brokerPort))
+  var kafkaServer : KafkaServer = null
+  val numMessages = 10
+  val largeOffset = 10000
+  val smallOffset = -1
+  
+  private val logger = Logger.getLogger(getClass())
+  val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandlers])
+
+  override def setUp() {
+    super.setUp()
+    kafkaServer = TestUtils.createServer(kafkaConfig)
+
+    // temporarily set request handler logger to a higher level
+    requestHandlerLogger.setLevel(Level.FATAL)
+  }
+
+  override def tearDown() {
+    // restore set request handler logger to a higher level
+    requestHandlerLogger.setLevel(Level.ERROR)
+    kafkaServer.shutdown
+    super.tearDown
+  }
+  
+  def testEarliestOffsetResetForward() = {
+    val producer = TestUtils.createProducer("localhost", brokerPort)
+
+    for(i <- 0 until numMessages) {
+      producer.send(topic, TestUtils.singleMessageSet("test".getBytes()))
+    }
+
+    // update offset in zookeeper for consumer to jump "forward" in time
+    val dirs = new ZKGroupTopicDirs(group, topic)
+    var consumerProps = TestUtils.createConsumerProperties(zkConnect, group, testConsumer)
+    consumerProps.put("autooffset.reset", "smallest")
+    consumerProps.put("consumer.timeout.ms", "2000")
+    val consumerConfig = new ConsumerConfig(consumerProps)
+    
+    TestUtils.updateConsumerOffset(consumerConfig, dirs.consumerOffsetDir + "/" + "0-0", largeOffset)
+    logger.info("Updated consumer offset to " + largeOffset)
+
+    Thread.sleep(500)
+    val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig)
+    val messageStreams = consumerConnector.createMessageStreams(Map(topic -> 1))
+
+    var threadList = List[Thread]()
+    val nMessages : AtomicInteger = new AtomicInteger(0)
+    for ((topic, streamList) <- messageStreams)
+      for (i <- 0 until streamList.length)
+        threadList ::= new Thread("kafka-zk-consumer-" + i) {
+          override def run() {
+
+            try {
+              for (message <- streamList(i)) {
+                nMessages.incrementAndGet
+              }
+            }
+            catch {
+              case te: ConsumerTimeoutException => logger.info("Consumer thread timing out..")
+              case _: InterruptedException => 
+              case _: ClosedByInterruptException =>
+              case e => throw e
+            }
+          }
+
+        }
+
+
+    for (thread <- threadList)
+      thread.start
+
+    threadList(0).join(2000)
+
+    logger.info("Asserting...")
+    assertEquals(numMessages, nMessages.get)
+    consumerConnector.shutdown
+  }
+
+  def testEarliestOffsetResetBackward() = {
+    val producer = TestUtils.createProducer("localhost", brokerPort)
+
+    for(i <- 0 until numMessages) {
+      producer.send(topic, TestUtils.singleMessageSet("test".getBytes()))
+    }
+
+    // update offset in zookeeper for consumer to jump "forward" in time
+    val dirs = new ZKGroupTopicDirs(group, topic)
+    var consumerProps = TestUtils.createConsumerProperties(zkConnect, group, testConsumer)
+    consumerProps.put("autooffset.reset", "smallest")
+    consumerProps.put("consumer.timeout.ms", "2000")
+    val consumerConfig = new ConsumerConfig(consumerProps)
+
+    TestUtils.updateConsumerOffset(consumerConfig, dirs.consumerOffsetDir + "/" + "0-0", smallOffset)
+    logger.info("Updated consumer offset to " + smallOffset)
+
+
+    val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig)
+    val messageStreams = consumerConnector.createMessageStreams(Map(topic -> 1))
+
+    var threadList = List[Thread]()
+    val nMessages : AtomicInteger = new AtomicInteger(0)
+    for ((topic, streamList) <- messageStreams)
+      for (i <- 0 until streamList.length)
+        threadList ::= new Thread("kafka-zk-consumer-" + i) {
+          override def run() {
+
+            try {
+              for (message <- streamList(i)) {
+                nMessages.incrementAndGet
+              }
+            }
+            catch {
+              case _: InterruptedException => 
+              case _: ClosedByInterruptException =>
+              case e => throw e
+            }
+          }
+
+        }
+
+
+    for (thread <- threadList)
+      thread.start
+
+    threadList(0).join(2000)
+
+    logger.info("Asserting...")
+    assertEquals(numMessages, nMessages.get)
+    consumerConnector.shutdown
+  }
+
+  def testLatestOffsetResetForward() = {
+    val producer = TestUtils.createProducer("localhost", brokerPort)
+
+    for(i <- 0 until numMessages) {
+      producer.send(topic, TestUtils.singleMessageSet("test".getBytes()))
+    }
+
+    // update offset in zookeeper for consumer to jump "forward" in time
+    val dirs = new ZKGroupTopicDirs(group, topic)
+    var consumerProps = TestUtils.createConsumerProperties(zkConnect, group, testConsumer)
+    consumerProps.put("autooffset.reset", "largest")
+    consumerProps.put("consumer.timeout.ms", "2000")
+    val consumerConfig = new ConsumerConfig(consumerProps)
+
+    TestUtils.updateConsumerOffset(consumerConfig, dirs.consumerOffsetDir + "/" + "0-0", largeOffset)
+    logger.info("Updated consumer offset to " + largeOffset)
+
+
+    val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig)
+    val messageStreams = consumerConnector.createMessageStreams(Map(topic -> 1))
+
+    var threadList = List[Thread]()
+    val nMessages : AtomicInteger = new AtomicInteger(0)
+    for ((topic, streamList) <- messageStreams)
+      for (i <- 0 until streamList.length)
+        threadList ::= new Thread("kafka-zk-consumer-" + i) {
+          override def run() {
+
+            try {
+              for (message <- streamList(i)) {
+                nMessages.incrementAndGet
+              }
+            }
+            catch {
+              case _: InterruptedException => 
+              case _: ClosedByInterruptException =>
+              case e => throw e
+            }
+          }
+
+        }
+
+
+    for (thread <- threadList)
+      thread.start
+
+    threadList(0).join(2000)
+
+    logger.info("Asserting...")
+
+    assertEquals(0, nMessages.get)
+    consumerConnector.shutdown
+  }
+
+  
+}

Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2010 LinkedIn
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.integration
+
+import kafka.server.{KafkaServer, KafkaConfig}
+import org.scalatest.junit.JUnit3Suite
+import org.apache.log4j.Logger
+import java.util.Properties
+import kafka.consumer.SimpleConsumer
+import kafka.utils.{Utils, TestUtils}
+import kafka.api.{OffsetRequest, FetchRequest}
+import junit.framework.Assert._
+import java.io.File
+
+class BackwardsCompatibilityTest extends JUnit3Suite {
+
+  val topic = "MagicByte0"
+  val group = "default_group"
+  val testConsumer = "consumer"
+  val kafkaProps = new Properties
+  val host = "localhost"
+  val port = 9892
+  val loader = getClass.getClassLoader
+  val kafkaLogDir = loader.getResource("test-kafka-logs")
+  kafkaProps.put("brokerid", "12")
+  kafkaProps.put("port", port.toString)
+  kafkaProps.put("log.dir", kafkaLogDir.getPath)
+  val kafkaConfig =
+    new KafkaConfig(kafkaProps) {
+      override val enableZookeeper = false
+    }
+  var kafkaServer : KafkaServer = null
+  var simpleConsumer: SimpleConsumer = null
+
+  private val logger = Logger.getLogger(getClass())
+
+  override def setUp() {
+    super.setUp()
+    kafkaServer = TestUtils.createServer(kafkaConfig)
+    simpleConsumer = new SimpleConsumer(host, port, 1000000, 64*1024)
+  }
+
+  override def tearDown() {
+    simpleConsumer.close
+    kafkaServer.shutdown
+    super.tearDown
+  }
+
+  // test for reading data with magic byte 0
+  def testProtocolVersion0() {
+    val lastOffset = simpleConsumer.getOffsetsBefore(topic, 0, OffsetRequest.LatestTime, 1)
+    var fetchOffset: Long = 0L
+    var messageCount: Int = 0
+
+    while(fetchOffset < lastOffset(0)) {
+      val fetched = simpleConsumer.fetch(new FetchRequest(topic, 0, fetchOffset, 10000))
+      fetched.foreach(m => fetchOffset = m.offset)
+      messageCount += fetched.size
+    }
+    assertEquals(100, messageCount)
+  }
+}

Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/FetcherTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/FetcherTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/FetcherTest.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.consumer
+
+import java.util.concurrent._
+import java.util.concurrent.atomic._
+import scala.collection._
+import junit.framework.Assert._
+
+import kafka.cluster._
+import kafka.message._
+import kafka.server._
+import org.scalatest.junit.JUnit3Suite
+import kafka.integration.KafkaServerTestHarness
+import kafka.utils.TestUtils
+
+class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
+
+  val numNodes = 2
+  val configs = 
+    for(props <- TestUtils.createBrokerConfigs(numNodes))
+      yield new KafkaConfig(props) {
+        override val enableZookeeper = false
+      }
+  val messages = new mutable.HashMap[Int, ByteBufferMessageSet]
+  val topic = "topic"
+  val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, c.brokerId.toString, "localhost", c.port)))
+  val shutdown = ZookeeperConsumerConnector.shutdownCommand
+  val queue = new LinkedBlockingQueue[FetchedDataChunk]
+  val topicInfos = configs.map(c => new PartitionTopicInfo(topic,
+                                                      c.brokerId,
+                                                      new Partition(c.brokerId, 0), 
+                                                      queue, 
+                                                      new AtomicLong(0), 
+                                                      new AtomicLong(0), 
+                                                      new AtomicInteger(0)))
+  
+  var fetcher: Fetcher = null
+
+  override def setUp() {
+    super.setUp
+    fetcher = new Fetcher(new ConsumerConfig(TestUtils.createConsumerProperties("", "", "")), null)
+    fetcher.initConnections(topicInfos, cluster, Set(queue))
+  }
+
+  override def tearDown() {
+    fetcher.shutdown
+    super.tearDown
+  }
+    
+  def testFetcher() {
+    val perNode = 2
+    var count = sendMessages(perNode)
+    fetch(count)
+    Thread.sleep(100)
+    assertQueueEmpty()
+    count = sendMessages(perNode)
+    fetch(count)
+    Thread.sleep(100)
+    assertQueueEmpty()
+  }
+  
+  def assertQueueEmpty(): Unit = assertEquals(0, queue.size)
+  
+  def sendMessages(messagesPerNode: Int): Int = {
+    var count = 0
+    for(conf <- configs) {
+      val producer = TestUtils.createProducer("localhost", conf.port)
+      val ms = 0.until(messagesPerNode).map(x => new Message((conf.brokerId * 5 + x).toString.getBytes)).toArray
+      val mSet = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = ms: _*)
+      messages += conf.brokerId -> mSet
+      producer.send(topic, mSet)
+      producer.close()
+      count += ms.size
+    }
+    count
+  }
+  
+  def fetch(expected: Int) {
+    var count = 0
+    while(true) {
+      val chunk = queue.poll(2L, TimeUnit.SECONDS)
+      assertNotNull("Timed out waiting for data chunk " + (count + 1), chunk)
+      for(message <- chunk.messages)
+        count += 1
+      if(count == expected)
+        return
+    }
+  }
+  
+}

Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.integration
+
+import java.util.Properties
+import junit.framework.Assert._
+import kafka.producer._
+import kafka.consumer._
+import kafka.message._
+import kafka.server._
+import kafka.utils.{Utils, TestUtils}
+import org.scalatest.junit.JUnit3Suite
+
+/**
+ * A test harness that brings up some number of broker nodes
+ */
+trait KafkaServerTestHarness extends JUnit3Suite {
+
+  val configs: List[KafkaConfig]
+  var servers: List[KafkaServer] = null
+
+  override def setUp() {
+    if(configs.size <= 0)
+      throw new IllegalArgumentException("Must suply at least one server config.")
+    servers = configs.map(TestUtils.createServer(_))
+    super.setUp
+  }
+
+  override def tearDown() {
+    super.tearDown
+    servers.map(server => server.shutdown())
+    servers.map(server => Utils.rm(server.config.logDir))
+  }
+
+}

Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,184 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.integration
+
+import scala.collection._
+import junit.framework.Assert._
+import kafka.common.OffsetOutOfRangeException
+import kafka.api.{ProducerRequest, FetchRequest}
+import kafka.server.{KafkaRequestHandlers, KafkaServer, KafkaConfig}
+import org.apache.log4j.{Level, Logger}
+import org.scalatest.junit.JUnit3Suite
+import kafka.utils.{TestUtils, Utils}
+import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
+
+/**
+ * End to end tests of the primitive apis against a local server
+ */
+class LazyInitProducerTest extends JUnit3Suite with ProducerConsumerTestHarness   {
+
+  val port = 9999
+  val props = TestUtils.createBrokerConfig(0, port)
+  val config = new KafkaConfig(props) {
+                 override val enableZookeeper = false
+               }
+  val configs = List(config)
+  var servers: List[KafkaServer] = null
+  val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandlers])
+
+  override def setUp() {
+    super.setUp
+    if(configs.size <= 0)
+      throw new IllegalArgumentException("Must suply at least one server config.")
+    servers = configs.map(TestUtils.createServer(_))
+
+    // temporarily set request handler logger to a higher level
+    requestHandlerLogger.setLevel(Level.FATAL)    
+  }
+
+  override def tearDown() {
+    // restore set request handler logger to a higher level
+    requestHandlerLogger.setLevel(Level.ERROR)
+
+    super.tearDown    
+    servers.map(server => server.shutdown())
+    servers.map(server => Utils.rm(server.config.logDir))
+  }
+  
+  def testProduceAndFetch() {
+    // send some messages
+    val topic = "test"
+    val sent = new ByteBufferMessageSet(NoCompressionCodec,
+                                        new Message("hello".getBytes()), new Message("there".getBytes()))
+    producer.send(topic, sent)
+    sent.getBuffer.rewind
+    var fetched: ByteBufferMessageSet = null
+    while(fetched == null || fetched.validBytes == 0)
+      fetched = consumer.fetch(new FetchRequest(topic, 0, 0, 10000))
+    TestUtils.checkEquals(sent.iterator, fetched.iterator)
+
+    // send an invalid offset
+    var exceptionThrown = false
+    try {
+      val fetchedWithError = consumer.fetch(new FetchRequest(topic, 0, -1, 10000))
+      fetchedWithError.iterator
+    }
+    catch {
+      case e: OffsetOutOfRangeException => exceptionThrown = true
+      case e2 => throw e2
+    }
+    assertTrue(exceptionThrown)
+  }
+
+  def testProduceAndMultiFetch() {
+    // send some messages
+    val topics = List("test1", "test2", "test3");
+    {
+      val messages = new mutable.HashMap[String, ByteBufferMessageSet]
+      val fetches = new mutable.ArrayBuffer[FetchRequest]
+      for(topic <- topics) {
+        val set = new ByteBufferMessageSet(NoCompressionCodec,
+                                           new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
+        messages += topic -> set
+        producer.send(topic, set)
+        set.getBuffer.rewind
+        fetches += new FetchRequest(topic, 0, 0, 10000)
+      }
+
+      // wait a bit for produced message to be available
+      Thread.sleep(200)
+      val response = consumer.multifetch(fetches: _*)
+      for((topic, resp) <- topics.zip(response.toList))
+    	  TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
+    }
+
+    {
+      // send some invalid offsets
+      val fetches = new mutable.ArrayBuffer[FetchRequest]
+      for(topic <- topics)
+        fetches += new FetchRequest(topic, 0, -1, 10000)
+
+      var exceptionThrown = false
+      try {
+        val responses = consumer.multifetch(fetches: _*)
+        for(resp <- responses)
+    	    resp.iterator
+      }
+      catch {
+        case e: OffsetOutOfRangeException => exceptionThrown = true
+        case e2 => throw e2
+      }
+      assertTrue(exceptionThrown)
+    }
+  }
+
+  def testMultiProduce() {
+    // send some messages
+    val topics = List("test1", "test2", "test3");
+    val messages = new mutable.HashMap[String, ByteBufferMessageSet]
+    val fetches = new mutable.ArrayBuffer[FetchRequest]
+    var produceList: List[ProducerRequest] = Nil
+    for(topic <- topics) {
+      val set = new ByteBufferMessageSet(NoCompressionCodec,
+                                         new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
+      messages += topic -> set
+      produceList ::= new ProducerRequest(topic, 0, set)
+      fetches += new FetchRequest(topic, 0, 0, 10000)
+    }
+    producer.multiSend(produceList.toArray)
+
+    for (messageSet <- messages.values)
+      messageSet.getBuffer.rewind
+
+    // wait a bit for produced message to be available
+    Thread.sleep(200)
+    val response = consumer.multifetch(fetches: _*)
+    for((topic, resp) <- topics.zip(response.toList))
+  	  TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
+  }
+
+  def testMultiProduceResend() {
+    // send some messages
+    val topics = List("test1", "test2", "test3");
+    val messages = new mutable.HashMap[String, ByteBufferMessageSet]
+    val fetches = new mutable.ArrayBuffer[FetchRequest]
+    var produceList: List[ProducerRequest] = Nil
+    for(topic <- topics) {
+      val set = new ByteBufferMessageSet(NoCompressionCodec,
+                                         new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
+      messages += topic -> set
+      produceList ::= new ProducerRequest(topic, 0, set)
+      fetches += new FetchRequest(topic, 0, 0, 10000)
+    }
+    producer.multiSend(produceList.toArray)
+
+    // resend the same multisend
+    producer.multiSend(produceList.toArray)
+
+    for (messageSet <- messages.values)
+      messageSet.getBuffer.rewind
+
+    // wait a bit for produced message to be available
+    Thread.sleep(750)
+    val response = consumer.multifetch(fetches: _*)
+    for((topic, resp) <- topics.zip(response.toList))
+      TestUtils.checkEquals(TestUtils.stackedIterator(messages(topic).map(m => m.message).iterator,
+                                                      messages(topic).map(m => m.message).iterator),
+                            resp.map(m => m.message).iterator)
+//  	  TestUtils.checkEquals(TestUtils.stackedIterator(messages(topic).iterator, messages(topic).iterator), resp.iterator)
+  }
+}

Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,94 @@
+package kafka.log
+
+import kafka.server.KafkaConfig
+import java.io.File
+import java.nio.ByteBuffer
+import kafka.utils.Utils
+import kafka.api.FetchRequest
+import kafka.common.InvalidMessageSizeException
+import kafka.zk.ZooKeeperTestHarness
+import kafka.utils.{TestZKUtils, TestUtils}
+import kafka.consumer.{ZookeeperConsumerConnector, ConsumerConfig}
+import org.scalatest.junit.JUnit3Suite
+import kafka.integration.ProducerConsumerTestHarness
+import kafka.integration.KafkaServerTestHarness
+import org.apache.log4j.{Logger, Level}
+import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
+
+class LogCorruptionTest extends JUnit3Suite with ProducerConsumerTestHarness with KafkaServerTestHarness with ZooKeeperTestHarness {
+  val zkConnect = TestZKUtils.zookeeperConnect  
+  val port = 9999
+  val props = TestUtils.createBrokerConfig(0, port)
+  val config = new KafkaConfig(props) {
+                 override val hostName = "localhost"
+                 override val enableZookeeper = true
+               }
+  val configs = List(config)
+  val topic = "test"
+  val partition = 0
+
+  def testMessageSizeTooLarge() {
+    val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandlers])
+    val fetcherLogger = Logger.getLogger(classOf[kafka.consumer.FetcherRunnable])
+
+    requestHandlerLogger.setLevel(Level.FATAL)
+    fetcherLogger.setLevel(Level.FATAL)
+
+    // send some messages
+    val sent1 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message("hello".getBytes()))
+    producer.send(topic, sent1)
+    Thread.sleep(200)
+
+    // corrupt the file on disk
+    val logFile = new File(config.logDir + File.separator + topic + "-" + partition, Log.nameFromOffset(0))
+    val byteBuffer = ByteBuffer.allocate(4)
+    byteBuffer.putInt(1000) // wrong message size
+    byteBuffer.rewind()
+    val channel = Utils.openChannel(logFile, true)
+    channel.write(byteBuffer)
+    channel.force(true)
+    channel.close
+
+    Thread.sleep(500)
+    // test SimpleConsumer
+    val messageSet = consumer.fetch(new FetchRequest(topic, partition, 0, 10000))
+    try {
+      for (msg <- messageSet)
+        fail("shouldn't reach here in SimpleConsumer since log file is corrupted.")
+      fail("shouldn't reach here in SimpleConsumer since log file is corrupted.")
+    }
+    catch {
+      case e: InvalidMessageSizeException => "This is good"
+    }
+
+    val messageSet2 = consumer.fetch(new FetchRequest(topic, partition, 0, 10000))
+    try {
+      for (msg <- messageSet2)
+        fail("shouldn't reach here in SimpleConsumer since log file is corrupted.")
+      fail("shouldn't reach here in SimpleConsumer since log file is corrupted.")
+    }
+    catch {
+      case e: InvalidMessageSizeException => println("This is good")
+    }
+
+    // test ZookeeperConsumer
+    val consumerConfig1 = new ConsumerConfig(
+      TestUtils.createConsumerProperties(zkConnect, "group1", "consumer1", 10000))
+    val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1)
+    val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic -> 1))
+    try {
+      for ((topic, messageStreams) <- topicMessageStreams1)
+      for (message <- messageStreams(0))
+        fail("shouldn't reach here in ZookeeperConsumer since log file is corrupted.")
+      fail("shouldn't reach here in ZookeeperConsumer since log file is corrupted.")
+    }
+    catch {
+      case e: InvalidMessageSizeException => "This is good"
+      case e: Exception => "This is not bad too !"
+    }
+
+    zkConsumerConnector1.shutdown
+    requestHandlerLogger.setLevel(Level.ERROR)
+    fetcherLogger.setLevel(Level.ERROR)
+  }
+}

Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,261 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.integration
+
+import scala.collection._
+import junit.framework.Assert._
+import kafka.api.{ProducerRequest, FetchRequest}
+import kafka.common.{OffsetOutOfRangeException, InvalidPartitionException}
+import kafka.server.{KafkaRequestHandlers, KafkaConfig}
+import org.apache.log4j.{Level, Logger}
+import org.scalatest.junit.JUnit3Suite
+import java.util.Properties
+import kafka.producer.{ProducerData, Producer, ProducerConfig}
+import kafka.serializer.StringDecoder
+import kafka.utils.TestUtils
+import kafka.message.{DefaultCompressionCodec, NoCompressionCodec, Message, ByteBufferMessageSet}
+
+/**
+ * End to end tests of the primitive apis against a local server
+ */
+class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with KafkaServerTestHarness {
+  
+  val port = 9999
+  val props = TestUtils.createBrokerConfig(0, port)
+  val config = new KafkaConfig(props) {
+                 override val enableZookeeper = false
+               }
+  val configs = List(config)
+  val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandlers])
+
+  def testDefaultEncoderProducerAndFetch() {
+    val topic = "test-topic"
+    val props = new Properties()
+    props.put("serializer.class", "kafka.serializer.StringEncoder")
+    props.put("broker.list", "0:localhost:9999")
+    val config = new ProducerConfig(props)
+
+    val stringProducer1 = new Producer[String, String](config)
+    stringProducer1.send(new ProducerData[String, String](topic, "test", Array("test-message")))
+    Thread.sleep(200)
+
+    var fetched = consumer.fetch(new FetchRequest(topic, 0, 0, 10000))
+    assertTrue(fetched.iterator.hasNext)
+
+    val fetchedMessageAndOffset = fetched.iterator.next
+    val stringDecoder = new StringDecoder
+    val fetchedStringMessage = stringDecoder.toEvent(fetchedMessageAndOffset.message)
+    assertEquals("test-message", fetchedStringMessage)
+  }
+
+  def testDefaultEncoderProducerAndFetchWithCompression() {
+    val topic = "test-topic"
+    val props = new Properties()
+    props.put("serializer.class", "kafka.serializer.StringEncoder")
+    props.put("broker.list", "0:localhost:9999")
+    props.put("compression", "true")
+    val config = new ProducerConfig(props)
+
+    val stringProducer1 = new Producer[String, String](config)
+    stringProducer1.send(new ProducerData[String, String](topic, "test", Array("test-message")))
+    Thread.sleep(200)
+
+    var fetched = consumer.fetch(new FetchRequest(topic, 0, 0, 10000))
+    assertTrue(fetched.iterator.hasNext)
+
+    val fetchedMessageAndOffset = fetched.iterator.next
+    val stringDecoder = new StringDecoder
+    val fetchedStringMessage = stringDecoder.toEvent(fetchedMessageAndOffset.message)
+    assertEquals("test-message", fetchedStringMessage)
+  }
+
+  def testProduceAndMultiFetch() {
+    // send some messages
+    val topics = List("test1", "test2", "test3");
+    {
+      val messages = new mutable.HashMap[String, ByteBufferMessageSet]
+      val fetches = new mutable.ArrayBuffer[FetchRequest]
+      for(topic <- topics) {
+        val set = new ByteBufferMessageSet(NoCompressionCodec,
+                                           new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
+        messages += topic -> set
+        producer.send(topic, set)
+        set.getBuffer.rewind
+        fetches += new FetchRequest(topic, 0, 0, 10000)
+      }
+
+      // wait a bit for produced message to be available
+      Thread.sleep(700)
+      val response = consumer.multifetch(fetches: _*)
+      for((topic, resp) <- topics.zip(response.toList))
+    	  TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
+    }
+
+    // temporarily set request handler logger to a higher level
+    requestHandlerLogger.setLevel(Level.FATAL)
+
+    {
+      // send some invalid offsets
+      val fetches = new mutable.ArrayBuffer[FetchRequest]
+      for(topic <- topics)
+        fetches += new FetchRequest(topic, 0, -1, 10000)
+
+      try {
+        val responses = consumer.multifetch(fetches: _*)
+        for(resp <- responses)
+    	    resp.iterator
+        fail("expect exception")
+      }
+      catch {
+        case e: OffsetOutOfRangeException => "this is good"
+      }
+    }    
+
+    {
+      // send some invalid partitions
+      val fetches = new mutable.ArrayBuffer[FetchRequest]
+      for(topic <- topics)
+        fetches += new FetchRequest(topic, -1, 0, 10000)
+
+      try {
+        val responses = consumer.multifetch(fetches: _*)
+        for(resp <- responses)
+    	    resp.iterator
+        fail("expect exception")
+      }
+      catch {
+        case e: InvalidPartitionException => "this is good"
+      }
+    }
+
+    // restore set request handler logger to a higher level
+    requestHandlerLogger.setLevel(Level.ERROR)
+  }
+
+  def testProduceAndMultiFetchWithCompression() {
+    // send some messages
+    val topics = List("test1", "test2", "test3");
+    {
+      val messages = new mutable.HashMap[String, ByteBufferMessageSet]
+      val fetches = new mutable.ArrayBuffer[FetchRequest]
+      for(topic <- topics) {
+        val set = new ByteBufferMessageSet(DefaultCompressionCodec,
+                                           new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
+        messages += topic -> set
+        producer.send(topic, set)
+        set.getBuffer.rewind
+        fetches += new FetchRequest(topic, 0, 0, 10000)
+      }
+
+      // wait a bit for produced message to be available
+      Thread.sleep(200)
+      val response = consumer.multifetch(fetches: _*)
+      for((topic, resp) <- topics.zip(response.toList))
+    	  TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
+    }
+
+    // temporarily set request handler logger to a higher level
+    requestHandlerLogger.setLevel(Level.FATAL)
+
+    {
+      // send some invalid offsets
+      val fetches = new mutable.ArrayBuffer[FetchRequest]
+      for(topic <- topics)
+        fetches += new FetchRequest(topic, 0, -1, 10000)
+
+      try {
+        val responses = consumer.multifetch(fetches: _*)
+        for(resp <- responses)
+    	    resp.iterator
+        fail("expect exception")
+      }
+      catch {
+        case e: OffsetOutOfRangeException => "this is good"
+      }
+    }
+
+    {
+      // send some invalid partitions
+      val fetches = new mutable.ArrayBuffer[FetchRequest]
+      for(topic <- topics)
+        fetches += new FetchRequest(topic, -1, 0, 10000)
+
+      try {
+        val responses = consumer.multifetch(fetches: _*)
+        for(resp <- responses)
+    	    resp.iterator
+        fail("expect exception")
+      }
+      catch {
+        case e: InvalidPartitionException => "this is good"
+      }
+    }
+
+    // restore set request handler logger to a higher level
+    requestHandlerLogger.setLevel(Level.ERROR)
+  }
+
+  def testMultiProduce() {
+    // send some messages
+    val topics = List("test1", "test2", "test3");
+    val messages = new mutable.HashMap[String, ByteBufferMessageSet]
+    val fetches = new mutable.ArrayBuffer[FetchRequest]
+    var produceList: List[ProducerRequest] = Nil
+    for(topic <- topics) {
+      val set = new ByteBufferMessageSet(NoCompressionCodec,
+                                         new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
+      messages += topic -> set
+      produceList ::= new ProducerRequest(topic, 0, set)
+      fetches += new FetchRequest(topic, 0, 0, 10000)
+    }
+    producer.multiSend(produceList.toArray)
+
+    for (messageSet <- messages.values)
+      messageSet.getBuffer.rewind
+      
+    // wait a bit for produced message to be available
+    Thread.sleep(200)
+    val response = consumer.multifetch(fetches: _*)
+    for((topic, resp) <- topics.zip(response.toList))
+  	  TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
+  }
+
+  def testMultiProduceWithCompression() {
+    // send some messages
+    val topics = List("test1", "test2", "test3");
+    val messages = new mutable.HashMap[String, ByteBufferMessageSet]
+    val fetches = new mutable.ArrayBuffer[FetchRequest]
+    var produceList: List[ProducerRequest] = Nil
+    for(topic <- topics) {
+      val set = new ByteBufferMessageSet(DefaultCompressionCodec,
+                                         new Message(("a_" + topic).getBytes), new Message(("b_" + topic).getBytes))
+      messages += topic -> set
+      produceList ::= new ProducerRequest(topic, 0, set)
+      fetches += new FetchRequest(topic, 0, 0, 10000)
+    }
+    producer.multiSend(produceList.toArray)
+
+    for (messageSet <- messages.values)
+      messageSet.getBuffer.rewind
+
+    // wait a bit for produced message to be available
+    Thread.sleep(200)
+    val response = consumer.multifetch(fetches: _*)
+    for((topic, resp) <- topics.zip(response.toList))
+  	  TestUtils.checkEquals(messages(topic).iterator, resp.iterator)
+  }
+}

Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.integration
+
+import kafka.consumer.SimpleConsumer
+import org.scalatest.junit.JUnit3Suite
+import java.util.Properties
+import kafka.producer.{SyncProducerConfig, SyncProducer}
+
+trait ProducerConsumerTestHarness extends JUnit3Suite {
+  
+    val port: Int
+    val host = "localhost"
+    var producer: SyncProducer = null
+    var consumer: SimpleConsumer = null
+
+    override def setUp() {
+      val props = new Properties()
+      props.put("host", host)
+      props.put("port", port.toString)
+      props.put("buffer.size", "65536")
+      props.put("connect.timeout.ms", "100000")
+      props.put("reconnect.interval", "10000")
+      producer = new SyncProducer(new SyncProducerConfig(props))
+      consumer = new SimpleConsumer(host,
+                                   port,
+                                   1000000,
+                                   64*1024)
+      super.setUp
+    }
+
+   override def tearDown() {
+     super.tearDown
+     producer.close()
+     consumer.close()
+   }
+}

Added: incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala (added)
+++ incubator/kafka/trunk/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala Mon Aug  1 23:41:24 2011
@@ -0,0 +1,287 @@
+/*
+ * Copyright 2010 LinkedIn
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.javaapi.consumer
+
+import junit.framework.Assert._
+import kafka.zk.ZooKeeperTestHarness
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import scala.collection._
+import kafka.utils.Utils
+import kafka.utils.{TestZKUtils, TestUtils}
+import org.scalatest.junit.JUnit3Suite
+import scala.collection.JavaConversions._
+import kafka.javaapi.message.ByteBufferMessageSet
+import kafka.consumer.{Consumer, ConsumerConfig, KafkaMessageStream, ConsumerTimeoutException}
+import javax.management.NotCompliantMBeanException
+import org.apache.log4j.{Level, Logger}
+import kafka.message.{NoCompressionCodec, DefaultCompressionCodec, CompressionCodec, Message}
+
+class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHarness with ZooKeeperTestHarness {
+  private val logger = Logger.getLogger(getClass())
+
+  val zookeeperConnect = TestZKUtils.zookeeperConnect
+  val zkConnect = zookeeperConnect
+  val numNodes = 2
+  val numParts = 2
+  val topic = "topic1"
+  val configs =
+    for(props <- TestUtils.createBrokerConfigs(numNodes))
+    yield new KafkaConfig(props) {
+      override val enableZookeeper = true
+      override val numPartitions = numParts
+      override val zkConnect = zookeeperConnect
+    }
+  val group = "group1"
+  val consumer0 = "consumer0"
+  val consumer1 = "consumer1"
+  val consumer2 = "consumer2"
+  val consumer3 = "consumer3"
+  val nMessages = 2
+
+  def testBasic() {
+    val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandlers])
+    requestHandlerLogger.setLevel(Level.FATAL)
+    var actualMessages: List[Message] = Nil
+
+    // test consumer timeout logic
+    val consumerConfig0 = new ConsumerConfig(
+      TestUtils.createConsumerProperties(zkConnect, group, consumer0)) {
+      override val consumerTimeoutMs = 200
+    }
+    val zkConsumerConnector0 = new ZookeeperConsumerConnector(consumerConfig0, true)
+    val topicMessageStreams0 = zkConsumerConnector0.createMessageStreams(toJavaMap(Predef.Map(topic -> numNodes*numParts/2)))
+    try {
+      getMessages(nMessages*2, topicMessageStreams0)
+      fail("should get an exception")
+    }
+    catch {
+      case e: ConsumerTimeoutException => // this is ok
+      case e => throw e
+    }
+    zkConsumerConnector0.shutdown
+
+    // send some messages to each broker
+    val sentMessages1 = sendMessages(nMessages, "batch1")
+    // create a consumer
+    val consumerConfig1 = new ConsumerConfig(
+      TestUtils.createConsumerProperties(zkConnect, group, consumer1))
+    val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
+    val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(toJavaMap(Predef.Map(topic -> numNodes*numParts/2)))
+    val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1)
+    assertEquals(sentMessages1, receivedMessages1)
+    // commit consumed offsets
+    zkConsumerConnector1.commitOffsets
+
+    // create a consumer
+    val consumerConfig2 = new ConsumerConfig(
+      TestUtils.createConsumerProperties(zkConnect, group, consumer2))
+    val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true)
+    val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(toJavaMap(Predef.Map(topic -> numNodes*numParts/2)))
+    // send some messages to each broker
+    val sentMessages2 = sendMessages(nMessages, "batch2")
+    Thread.sleep(200)
+    val receivedMessages2_1 = getMessages(nMessages, topicMessageStreams1)
+    val receivedMessages2_2 = getMessages(nMessages, topicMessageStreams2)
+    val receivedMessages2 = (receivedMessages2_1 ::: receivedMessages2_2).sortWith((s,t) => s.checksum < t.checksum)
+    assertEquals(sentMessages2, receivedMessages2)
+
+    // create a consumer with empty map
+    val consumerConfig3 = new ConsumerConfig(
+      TestUtils.createConsumerProperties(zkConnect, group, consumer3))
+    val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true)
+    val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(toJavaMap(new mutable.HashMap[String, Int]()))
+    // send some messages to each broker
+    Thread.sleep(200)
+    val sentMessages3 = sendMessages(nMessages, "batch3")
+    Thread.sleep(200)
+    val receivedMessages3_1 = getMessages(nMessages, topicMessageStreams1)
+    val receivedMessages3_2 = getMessages(nMessages, topicMessageStreams2)
+    val receivedMessages3 = (receivedMessages3_1 ::: receivedMessages3_2).sortWith((s,t) => s.checksum < t.checksum)
+    assertEquals(sentMessages3, receivedMessages3)
+
+    zkConsumerConnector1.shutdown
+    zkConsumerConnector2.shutdown
+    zkConsumerConnector3.shutdown
+    logger.info("all consumer connectors stopped")
+    requestHandlerLogger.setLevel(Level.ERROR)
+  }
+
+  def testCompression() {
+    val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandlers])
+    requestHandlerLogger.setLevel(Level.FATAL)
+    var actualMessages: List[Message] = Nil
+
+    // test consumer timeout logic
+    val consumerConfig0 = new ConsumerConfig(
+      TestUtils.createConsumerProperties(zkConnect, group, consumer0)) {
+      override val consumerTimeoutMs = 200
+    }
+    val zkConsumerConnector0 = new ZookeeperConsumerConnector(consumerConfig0, true)
+    val topicMessageStreams0 = zkConsumerConnector0.createMessageStreams(toJavaMap(Predef.Map(topic -> numNodes*numParts/2)))
+    try {
+      getMessages(nMessages*2, topicMessageStreams0)
+      fail("should get an exception")
+    }
+    catch {
+      case e: ConsumerTimeoutException => // this is ok
+      case e => throw e
+    }
+    zkConsumerConnector0.shutdown
+
+    // send some messages to each broker
+    val sentMessages1 = sendMessages(nMessages, "batch1", DefaultCompressionCodec)
+    // create a consumer
+    val consumerConfig1 = new ConsumerConfig(
+      TestUtils.createConsumerProperties(zkConnect, group, consumer1))
+    val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
+    val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(toJavaMap(Predef.Map(topic -> numNodes*numParts/2)))
+    val receivedMessages1 = getMessages(nMessages*2, topicMessageStreams1)
+    assertEquals(sentMessages1, receivedMessages1)
+    // commit consumed offsets
+    zkConsumerConnector1.commitOffsets
+
+    // create a consumer
+    val consumerConfig2 = new ConsumerConfig(
+      TestUtils.createConsumerProperties(zkConnect, group, consumer2))
+    val zkConsumerConnector2 = new ZookeeperConsumerConnector(consumerConfig2, true)
+    val topicMessageStreams2 = zkConsumerConnector2.createMessageStreams(toJavaMap(Predef.Map(topic -> numNodes*numParts/2)))
+    // send some messages to each broker
+    val sentMessages2 = sendMessages(nMessages, "batch2", DefaultCompressionCodec)
+    Thread.sleep(200)
+    val receivedMessages2_1 = getMessages(nMessages, topicMessageStreams1)
+    val receivedMessages2_2 = getMessages(nMessages, topicMessageStreams2)
+    val receivedMessages2 = (receivedMessages2_1 ::: receivedMessages2_2).sortWith((s,t) => s.checksum < t.checksum)
+    assertEquals(sentMessages2, receivedMessages2)
+
+    // create a consumer with empty map
+    val consumerConfig3 = new ConsumerConfig(
+      TestUtils.createConsumerProperties(zkConnect, group, consumer3))
+    val zkConsumerConnector3 = new ZookeeperConsumerConnector(consumerConfig3, true)
+    val topicMessageStreams3 = zkConsumerConnector3.createMessageStreams(toJavaMap(new mutable.HashMap[String, Int]()))
+    // send some messages to each broker
+    Thread.sleep(200)
+    val sentMessages3 = sendMessages(nMessages, "batch3", DefaultCompressionCodec)
+    Thread.sleep(200)
+    val receivedMessages3_1 = getMessages(nMessages, topicMessageStreams1)
+    val receivedMessages3_2 = getMessages(nMessages, topicMessageStreams2)
+    val receivedMessages3 = (receivedMessages3_1 ::: receivedMessages3_2).sortWith((s,t) => s.checksum < t.checksum)
+    assertEquals(sentMessages3, receivedMessages3)
+
+    zkConsumerConnector1.shutdown
+    zkConsumerConnector2.shutdown
+    zkConsumerConnector3.shutdown
+    logger.info("all consumer connectors stopped")
+    requestHandlerLogger.setLevel(Level.ERROR)
+  }
+
+  def testCompressionSetConsumption() {
+    val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandlers])
+    requestHandlerLogger.setLevel(Level.FATAL)
+
+    var actualMessages: List[Message] = Nil
+
+    // shutdown one server
+    servers.last.shutdown
+    Thread.sleep(500)
+
+    // send some messages to each broker
+    val sentMessages = sendMessages(configs.head, 200, "batch1", DefaultCompressionCodec)
+    // test consumer timeout logic
+    val consumerConfig0 = new ConsumerConfig(
+      TestUtils.createConsumerProperties(zkConnect, group, consumer0)) {
+      override val consumerTimeoutMs = 5000
+    }
+    val zkConsumerConnector0 = new ZookeeperConsumerConnector(consumerConfig0, true)
+    val topicMessageStreams0 = zkConsumerConnector0.createMessageStreams(toJavaMap(Predef.Map(topic -> 1)))
+    getMessages(100, topicMessageStreams0)
+    zkConsumerConnector0.shutdown
+    // at this point, only some part of the message set was consumed. So consumed offset should still be 0
+    // also fetched offset should be 0
+    val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig0, true)
+    val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(toJavaMap(Predef.Map(topic -> 1)))
+    val receivedMessages = getMessages(400, topicMessageStreams1)
+    val sortedReceivedMessages = receivedMessages.sortWith((s,t) => s.checksum < t.checksum)
+    val sortedSentMessages = sentMessages.sortWith((s,t) => s.checksum < t.checksum)
+    assertEquals(sortedSentMessages, sortedReceivedMessages)
+    zkConsumerConnector1.shutdown
+
+    requestHandlerLogger.setLevel(Level.ERROR)
+  }
+
+  def sendMessages(conf: KafkaConfig, messagesPerNode: Int, header: String, compressed: CompressionCodec): List[Message]= {
+    var messages: List[Message] = Nil
+    val producer = kafka.javaapi.Implicits.toJavaSyncProducer(TestUtils.createProducer("localhost", conf.port))
+    for (partition <- 0 until numParts) {
+      val ms = 0.until(messagesPerNode).map(x =>
+        new Message((header + conf.brokerId + "-" + partition + "-" + x).getBytes)).toArray
+      val mSet = new ByteBufferMessageSet(compressionCodec = compressed, messages = getMessageList(ms: _*))
+      for (message <- ms)
+        messages ::= message
+      producer.send(topic, partition, mSet)
+    }
+    producer.close()
+    messages
+  }
+
+  def sendMessages(messagesPerNode: Int, header: String, compressed: CompressionCodec = NoCompressionCodec): List[Message]= {
+    var messages: List[Message] = Nil
+    for(conf <- configs) {
+      messages ++= sendMessages(conf, messagesPerNode, header, compressed)
+    }
+    messages.sortWith((s,t) => s.checksum < t.checksum)
+  }
+
+  def getMessages(nMessagesPerThread: Int, jTopicMessageStreams: java.util.Map[String, java.util.List[KafkaMessageStream]])
+  : List[Message]= {
+    var messages: List[Message] = Nil
+    val topicMessageStreams = asMap(jTopicMessageStreams)
+    for ((topic, messageStreams) <- topicMessageStreams) {
+      for (messageStream <- messageStreams) {
+        val iterator = messageStream.iterator
+        for (i <- 0 until nMessagesPerThread) {
+          assertTrue(iterator.hasNext)
+          val message = iterator.next
+          messages ::= message
+          logger.debug("received message: " + Utils.toString(message.payload, "UTF-8"))
+        }
+      }
+    }
+    messages.sortWith((s,t) => s.checksum < t.checksum)
+  }
+
+  def testJMX() {
+    val consumerConfig = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer0))
+    try {
+      val consumer = Consumer.createJavaConsumerConnector(consumerConfig)
+    }catch {
+      case e: NotCompliantMBeanException => fail("Should not fail with NotCompliantMBeanException")
+    }
+  }
+
+  private def getMessageList(messages: Message*): java.util.List[Message] = {
+    val messageList = new java.util.ArrayList[Message]()
+    messages.foreach(m => messageList.add(m))
+    messageList
+  }
+
+  private def toJavaMap(scalaMap: Map[String, Int]): java.util.Map[String, java.lang.Integer] = {
+    val javaMap = new java.util.HashMap[String, java.lang.Integer]()
+    scalaMap.foreach(m => javaMap.put(m._1, m._2.asInstanceOf[java.lang.Integer]))
+    javaMap
+  }
+}