You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2014/03/14 23:14:49 UTC

[3/5] KAFKA-1012; Consumer offset management in Kafka; patched by Tejas Patil and Joel Koshy; feedback and reviews from Neha Narkhede, Jun Rao, Guozhang Wang, Sriram Subramanian, Joe Stein, Chris Riccomini

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/test/scala/other/kafka/TestOffsetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestOffsetManager.scala b/core/src/test/scala/other/kafka/TestOffsetManager.scala
new file mode 100644
index 0000000..83317f0
--- /dev/null
+++ b/core/src/test/scala/other/kafka/TestOffsetManager.scala
@@ -0,0 +1,291 @@
+package other.kafka
+
+import org.I0Itec.zkclient.ZkClient
+import kafka.api._
+import kafka.utils.{ShutdownableThread, ZKStringSerializer}
+import scala.collection._
+import kafka.client.ClientUtils
+import joptsimple.OptionParser
+import kafka.common.{ErrorMapping, OffsetAndMetadata, TopicAndPartition}
+import kafka.network.BlockingChannel
+import scala.util.Random
+import java.io.IOException
+import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
+import java.util.concurrent.TimeUnit
+import com.yammer.metrics.core.Gauge
+import java.util.concurrent.atomic.AtomicInteger
+import java.nio.channels.ClosedByInterruptException
+
+
+object TestOffsetManager {
+
+  val random = new Random
+  val SocketTimeoutMs = 10000
+
+  class StatsThread(reportingIntervalMs: Long, commitThreads: Seq[CommitThread], fetchThread: FetchThread)
+        extends ShutdownableThread("stats-thread") {
+
+    def printStats() {
+      println("--------------------------------------------------------------------------------")
+      println("Aggregate stats for commits:")
+      println("Error count: %d; Max:%f; Min: %f; Mean: %f; Commit count: %d".format(
+        commitThreads.map(_.numErrors.get).sum,
+        commitThreads.map(_.timer.max()).max,
+        commitThreads.map(_.timer.min()).min,
+        commitThreads.map(_.timer.mean()).sum / commitThreads.size,
+        commitThreads.map(_.numCommits.get).sum))
+      println("--------------------------------------------------------------------------------")
+      commitThreads.foreach(t => println(t.stats))
+      println(fetchThread.stats)
+    }
+
+    override def doWork() {
+      printStats()
+      Thread.sleep(reportingIntervalMs)
+    }
+
+  }
+
+  class CommitThread(id: Int, partitionCount: Int, commitIntervalMs: Long, zkClient: ZkClient)
+        extends ShutdownableThread("commit-thread")
+        with KafkaMetricsGroup {
+
+    private val group = "group-" + id
+    private val metadata = "Metadata from commit thread " + id
+    private var offsetsChannel = ClientUtils.channelToOffsetManager(group, zkClient, SocketTimeoutMs)
+    private var offset = 0L
+    val numErrors = new AtomicInteger(0)
+    val numCommits = new AtomicInteger(0)
+    val timer = newTimer("commit-thread", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)
+    private val commitTimer = new KafkaTimer(timer)
+    val shutdownLock = new Object
+
+    private def ensureConnected() {
+      if (!offsetsChannel.isConnected)
+        offsetsChannel = ClientUtils.channelToOffsetManager(group, zkClient, SocketTimeoutMs)
+    }
+
+    override def doWork() {
+      val commitRequest = OffsetCommitRequest(group, mutable.Map((1 to partitionCount).map(TopicAndPartition("topic-" + id, _) -> OffsetAndMetadata(offset, metadata)):_*))
+      try {
+        ensureConnected()
+        offsetsChannel.send(commitRequest)
+        numCommits.getAndIncrement
+        commitTimer.time {
+          val response = OffsetCommitResponse.readFrom(offsetsChannel.receive().buffer)
+          if (response.commitStatus.exists(_._2 != ErrorMapping.NoError)) numErrors.getAndIncrement
+        }
+        offset += 1
+      }
+      catch {
+        case e1: ClosedByInterruptException =>
+          offsetsChannel.disconnect()
+        case e2: IOException =>
+          println("Commit thread %d: Error while committing offsets to %s:%d for group %s due to %s.".format(id, offsetsChannel.host, offsetsChannel.port, group, e2))
+          offsetsChannel.disconnect()
+      }
+      finally {
+        Thread.sleep(commitIntervalMs)
+      }
+    }
+
+    override def shutdown() {
+      super.shutdown()
+      awaitShutdown()
+      offsetsChannel.disconnect()
+      println("Commit thread %d ended. Last committed offset: %d.".format(id, offset))
+    }
+
+    def stats = {
+      "Commit thread %d :: Error count: %d; Max:%f; Min: %f; Mean: %f; Commit count: %d"
+      .format(id, numErrors.get(), timer.max(), timer.min(), timer.mean(), numCommits.get())
+    }
+  }
+
+  class FetchThread(numGroups: Int, fetchIntervalMs: Long, zkClient: ZkClient)
+        extends ShutdownableThread("fetch-thread")
+        with KafkaMetricsGroup {
+
+    private val timer = newTimer("fetch-thread", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)
+    private val fetchTimer = new KafkaTimer(timer)
+
+    private val channels = mutable.Map[Int, BlockingChannel]()
+    private var metadataChannel = ClientUtils.channelToAnyBroker(zkClient, SocketTimeoutMs)
+
+    private val numErrors = new AtomicInteger(0)
+
+    override def doWork() {
+      val id = random.nextInt().abs % numGroups
+      val group = "group-" + id
+      try {
+        metadataChannel.send(ConsumerMetadataRequest(group))
+        val coordinatorId = ConsumerMetadataResponse.readFrom(metadataChannel.receive().buffer).coordinator.map(_.id).getOrElse(-1)
+
+        val channel = if (channels.contains(coordinatorId))
+          channels(coordinatorId)
+        else {
+          val newChannel = ClientUtils.channelToOffsetManager(group, zkClient, SocketTimeoutMs)
+          channels.put(coordinatorId, newChannel)
+          newChannel
+        }
+
+        try {
+          // send the offset fetch request
+          val fetchRequest = OffsetFetchRequest(group, Seq(TopicAndPartition("topic-"+id, 1)))
+          channel.send(fetchRequest)
+
+          fetchTimer.time {
+            val response = OffsetFetchResponse.readFrom(channel.receive().buffer)
+            if (response.requestInfo.exists(_._2.error != ErrorMapping.NoError)) {
+              numErrors.getAndIncrement
+            }
+          }
+        }
+        catch {
+          case e1: ClosedByInterruptException =>
+            channel.disconnect()
+            channels.remove(coordinatorId)
+          case e2: IOException =>
+            println("Error while fetching offset from %s:%d due to %s.".format(channel.host, channel.port, e2))
+            channel.disconnect()
+            channels.remove(coordinatorId)
+        }
+      }
+      catch {
+        case e: IOException =>
+          println("Error while querying %s:%d - shutting down query channel.".format(metadataChannel.host, metadataChannel.port))
+          metadataChannel.disconnect()
+          println("Creating new query channel.")
+          metadataChannel = ClientUtils.channelToAnyBroker(zkClient, SocketTimeoutMs)
+      }
+      finally {
+        Thread.sleep(fetchIntervalMs)
+      }
+
+    }
+
+    override def shutdown() {
+      super.shutdown()
+      awaitShutdown()
+      channels.foreach(_._2.disconnect())
+      metadataChannel.disconnect()
+    }
+
+    def stats = {
+      "Fetch thread :: Error count: %d; Max:%f; Min: %f; Mean: %f; Fetch count: %d"
+      .format(numErrors.get(), timer.max(), timer.min(), timer.mean(), timer.count())
+    }
+  }
+
+  def main(args: Array[String]) {
+    val parser = new OptionParser
+    val zookeeperOpt = parser.accepts("zookeeper", "The ZooKeeper connection URL.")
+      .withRequiredArg
+      .describedAs("ZooKeeper URL")
+      .ofType(classOf[java.lang.String])
+      .defaultsTo("localhost:2181")
+
+    val commitIntervalOpt = parser.accepts("commit-interval-ms", "Offset commit interval.")
+      .withRequiredArg
+      .describedAs("interval")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(100)
+
+    val fetchIntervalOpt = parser.accepts("fetch-interval-ms", "Offset fetch interval.")
+      .withRequiredArg
+      .describedAs("interval")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(1000)
+
+    val numPartitionsOpt = parser.accepts("partition-count", "Number of partitions per commit.")
+      .withRequiredArg
+      .describedAs("interval")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(1)
+
+    val numThreadsOpt = parser.accepts("thread-count", "Number of commit threads.")
+      .withRequiredArg
+      .describedAs("threads")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(1)
+
+    val reportingIntervalOpt = parser.accepts("reporting-interval-ms", "Interval at which stats are reported.")
+      .withRequiredArg
+      .describedAs("interval (ms)")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(3000)
+
+    val helpOpt = parser.accepts("help", "Print this message.")
+
+    val options = parser.parse(args : _*)
+
+    if (options.has(helpOpt)) {
+      parser.printHelpOn(System.out)
+      System.exit(0)
+    }
+
+    val commitIntervalMs = options.valueOf(commitIntervalOpt).intValue()
+    val fetchIntervalMs = options.valueOf(fetchIntervalOpt).intValue()
+    val threadCount = options.valueOf(numThreadsOpt).intValue()
+    val partitionCount = options.valueOf(numPartitionsOpt).intValue()
+    val zookeeper = options.valueOf(zookeeperOpt)
+    val reportingIntervalMs = options.valueOf(reportingIntervalOpt).intValue()
+    println("Commit thread count: %d; Partition count: %d, Commit interval: %d ms; Fetch interval: %d ms; Reporting interval: %d ms"
+            .format(threadCount, partitionCount, commitIntervalMs, fetchIntervalMs, reportingIntervalMs))
+
+    var zkClient: ZkClient = null
+    var commitThreads: Seq[CommitThread] = Seq()
+    var fetchThread: FetchThread = null
+    var statsThread: StatsThread = null
+    try {
+      zkClient = new ZkClient(zookeeper, 6000, 2000, ZKStringSerializer)
+      commitThreads = (0 to (threadCount-1)).map { threadId =>
+        new CommitThread(threadId, partitionCount, commitIntervalMs, zkClient)
+      }
+
+      fetchThread = new FetchThread(threadCount, fetchIntervalMs, zkClient)
+
+      val statsThread = new StatsThread(reportingIntervalMs, commitThreads, fetchThread)
+
+      Runtime.getRuntime.addShutdownHook(new Thread() {
+        override def run() {
+          cleanShutdown()
+          statsThread.printStats()
+        }
+      })
+
+      commitThreads.foreach(_.start())
+
+      fetchThread.start()
+
+      statsThread.start()
+
+      commitThreads.foreach(_.join())
+      fetchThread.join()
+      statsThread.join()
+    }
+    catch {
+      case e: Throwable =>
+        println("Error: ", e)
+    }
+    finally {
+      cleanShutdown()
+    }
+
+    def cleanShutdown() {
+      commitThreads.foreach(_.shutdown())
+      commitThreads.foreach(_.join())
+      if (fetchThread != null) {
+        fetchThread.shutdown()
+        fetchThread.join()
+      }
+      if (statsThread != null) {
+        statsThread.shutdown()
+        statsThread.join()
+      }
+      zkClient.close()
+    }
+
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala b/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala
deleted file mode 100644
index 31534ca..0000000
--- a/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka
-
-import consumer._
-import utils.Utils
-import java.util.concurrent.CountDownLatch
-
-object TestZKConsumerOffsets {
-  def main(args: Array[String]): Unit = {
-    if(args.length < 1) {
-      println("USAGE: " + TestZKConsumerOffsets.getClass.getName + " consumer.properties topic latest")
-      System.exit(1)
-    }
-    println("Starting consumer...")
-    val topic = args(1)
-    val autoOffsetReset = args(2)    
-    val props = Utils.loadProps(args(0))
-    props.put("auto.offset.reset", "largest")
-    
-    val config = new ConsumerConfig(props)
-    val consumerConnector: ConsumerConnector = Consumer.create(config)
-    val topicMessageStreams = consumerConnector.createMessageStreams(Predef.Map(topic -> 1))
-    var threadList = List[ConsumerThread]()
-    for ((topic, streamList) <- topicMessageStreams)
-      for (stream <- streamList)
-        threadList ::= new ConsumerThread(stream)
-
-    for (thread <- threadList)
-      thread.start
-
-    // attach shutdown handler to catch control-c
-    Runtime.getRuntime().addShutdownHook(new Thread() {
-      override def run() = {
-        consumerConnector.shutdown
-        threadList.foreach(_.shutdown)
-        println("consumer threads shutted down")
-      }
-    })
-  }
-}
-
-private class ConsumerThread(stream: KafkaStream[Array[Byte], Array[Byte]]) extends Thread {
-  val shutdownLatch = new CountDownLatch(1)
-
-  override def run() {
-    println("Starting consumer thread..")
-    for (messageAndMetadata <- stream) {
-      println("consumed: " + new String(messageAndMetadata.message, "UTF-8"))
-    }
-    shutdownLatch.countDown
-    println("thread shutdown !" )
-  }
-
-  def shutdown() {
-    shutdownLatch.await
-  }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index dbe078c..6db76a5 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -142,13 +142,6 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
       val offsetResponse = consumer.getOffsetsBefore(offsetRequest)
       val errorCode = offsetResponse.partitionErrorAndOffsets(topicAndPartition).error
       assertTrue("Offset request should fail with UnknownTopicOrPartitionCode", errorCode == ErrorMapping.UnknownTopicOrPartitionCode)
-      // test if offset fetch requests fail during delete topic
-      val offsetFetchRequest = new OffsetFetchRequest("test-group", Seq(topicAndPartition))
-      val offsetFetchResponse = consumer.fetchOffsets(offsetFetchRequest)
-      val offsetFetchErrorCode = offsetFetchResponse.requestInfo(topicAndPartition).error
-      assertTrue("Offset fetch request should fail with UnknownTopicOrPartitionCode",
-        offsetFetchErrorCode == ErrorMapping.UnknownTopicOrPartitionCode)
-      // TODO: test if offset commit requests fail during delete topic
     }
     // restart follower replica
     follower.startup()

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index eb274d1..5378446 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -23,12 +23,12 @@ import junit.framework.Assert._
 import java.nio.ByteBuffer
 import kafka.message.{Message, ByteBufferMessageSet}
 import kafka.cluster.Broker
-import collection.mutable._
-import kafka.common.{TopicAndPartition, ErrorMapping, OffsetMetadataAndError}
+import kafka.common.{OffsetAndMetadata, TopicAndPartition, ErrorMapping, OffsetMetadataAndError}
 import kafka.controller.LeaderIsrAndControllerEpoch
+import kafka.utils.SystemTime
 
 
-object SerializationTestUtils{
+object SerializationTestUtils {
   private val topic1 = "test1"
   private val topic2 = "test2"
   private val leader1 = 0
@@ -147,17 +147,15 @@ object SerializationTestUtils{
   }
 
   def createTestOffsetCommitRequest: OffsetCommitRequest = {
-    new OffsetCommitRequest("group 1", collection.immutable.Map(
-      TopicAndPartition(topic1, 0) -> OffsetMetadataAndError(offset=42L, metadata="some metadata"),
-      TopicAndPartition(topic1, 1) -> OffsetMetadataAndError(offset=100L, metadata=OffsetMetadataAndError.NoMetadata)
+    new OffsetCommitRequest("group 1", collection.mutable.Map(
+      TopicAndPartition(topic1, 0) -> OffsetAndMetadata(offset=42L, metadata="some metadata", timestamp=SystemTime.milliseconds),
+      TopicAndPartition(topic1, 1) -> OffsetAndMetadata(offset=100L, metadata=OffsetAndMetadata.NoMetadata, timestamp=SystemTime.milliseconds)
     ))
   }
 
   def createTestOffsetCommitResponse: OffsetCommitResponse = {
-    new OffsetCommitResponse(collection.immutable.Map(
-      TopicAndPartition(topic1, 0) -> ErrorMapping.NoError,
-      TopicAndPartition(topic1, 1) -> ErrorMapping.UnknownTopicOrPartitionCode
-    ))
+    new OffsetCommitResponse(collection.immutable.Map(TopicAndPartition(topic1, 0) -> ErrorMapping.NoError,
+                                 TopicAndPartition(topic1, 1) -> ErrorMapping.NoError))
   }
 
   def createTestOffsetFetchRequest: OffsetFetchRequest = {
@@ -170,11 +168,18 @@ object SerializationTestUtils{
   def createTestOffsetFetchResponse: OffsetFetchResponse = {
     new OffsetFetchResponse(collection.immutable.Map(
       TopicAndPartition(topic1, 0) -> OffsetMetadataAndError(42L, "some metadata", ErrorMapping.NoError),
-      TopicAndPartition(topic1, 1) -> OffsetMetadataAndError(100L, OffsetMetadataAndError.NoMetadata,
-        ErrorMapping.UnknownTopicOrPartitionCode)
+      TopicAndPartition(topic1, 1) -> OffsetMetadataAndError(100L, OffsetAndMetadata.NoMetadata, ErrorMapping.UnknownTopicOrPartitionCode)
     ))
   }
 
+  def createConsumerMetadataRequest: ConsumerMetadataRequest = {
+    ConsumerMetadataRequest("group 1", clientId = "client 1")
+  }
+
+  def createConsumerMetadataResponse: ConsumerMetadataResponse = {
+    ConsumerMetadataResponse(Some(brokers.head), ErrorMapping.NoError)
+  }
+
 }
 
 class RequestResponseSerializationTest extends JUnitSuite {
@@ -193,114 +198,30 @@ class RequestResponseSerializationTest extends JUnitSuite {
   private val offsetCommitResponse = SerializationTestUtils.createTestOffsetCommitResponse
   private val offsetFetchRequest = SerializationTestUtils.createTestOffsetFetchRequest
   private val offsetFetchResponse = SerializationTestUtils.createTestOffsetFetchResponse
-
+  private val consumerMetadataRequest = SerializationTestUtils.createConsumerMetadataRequest
+  private val consumerMetadataResponse = SerializationTestUtils.createConsumerMetadataResponse
 
   @Test
   def testSerializationAndDeserialization() {
-    var buffer: ByteBuffer = ByteBuffer.allocate(leaderAndIsrRequest.sizeInBytes())
-    leaderAndIsrRequest.writeTo(buffer)
-    buffer.rewind()
-    val deserializedLeaderAndIsrRequest = LeaderAndIsrRequest.readFrom(buffer)
-    assertEquals("The original and deserialzed leaderAndISRRequest should be the same", leaderAndIsrRequest,
-                 deserializedLeaderAndIsrRequest)
-
-    buffer = ByteBuffer.allocate(leaderAndIsrResponse.sizeInBytes())
-    leaderAndIsrResponse.writeTo(buffer)
-    buffer.rewind()
-    val deserializedLeaderAndIsrResponse = LeaderAndIsrResponse.readFrom(buffer)
-    assertEquals("The original and deserialzed leaderAndISRResponse should be the same", leaderAndIsrResponse,
-                 deserializedLeaderAndIsrResponse)
-
-    buffer = ByteBuffer.allocate(stopReplicaRequest.sizeInBytes())
-    stopReplicaRequest.writeTo(buffer)
-    buffer.rewind()
-    val deserializedStopReplicaRequest = StopReplicaRequest.readFrom(buffer)
-    assertEquals("The original and deserialzed stopReplicaRequest should be the same", stopReplicaRequest,
-                 deserializedStopReplicaRequest)
-
-    buffer = ByteBuffer.allocate(stopReplicaResponse.sizeInBytes())
-    stopReplicaResponse.writeTo(buffer)
-    buffer.rewind()
-    val deserializedStopReplicaResponse = StopReplicaResponse.readFrom(buffer)
-    assertEquals("The original and deserialzed stopReplicaResponse should be the same", stopReplicaResponse,
-                 deserializedStopReplicaResponse)
-
-    buffer = ByteBuffer.allocate(producerRequest.sizeInBytes)
-    producerRequest.writeTo(buffer)
-    buffer.rewind()
-    val deserializedProducerRequest = ProducerRequest.readFrom(buffer)
-    assertEquals("The original and deserialzed producerRequest should be the same", producerRequest,
-                 deserializedProducerRequest)
-
-    buffer = ByteBuffer.allocate(producerResponse.sizeInBytes)
-    producerResponse.writeTo(buffer)
-    buffer.rewind()
-    val deserializedProducerResponse = ProducerResponse.readFrom(buffer)
-    assertEquals("The original and deserialzed producerResponse should be the same: [%s], [%s]".format(producerResponse, deserializedProducerResponse), producerResponse,
-                 deserializedProducerResponse)
-
-    buffer = ByteBuffer.allocate(fetchRequest.sizeInBytes)
-    fetchRequest.writeTo(buffer)
-    buffer.rewind()
-    val deserializedFetchRequest = FetchRequest.readFrom(buffer)
-    assertEquals("The original and deserialzed fetchRequest should be the same", fetchRequest,
-                 deserializedFetchRequest)
-
-    buffer = ByteBuffer.allocate(offsetRequest.sizeInBytes)
-    offsetRequest.writeTo(buffer)
-    buffer.rewind()
-    val deserializedOffsetRequest = OffsetRequest.readFrom(buffer)
-    assertEquals("The original and deserialzed offsetRequest should be the same", offsetRequest,
-                 deserializedOffsetRequest)
-
-    buffer = ByteBuffer.allocate(offsetResponse.sizeInBytes)
-    offsetResponse.writeTo(buffer)
-    buffer.rewind()
-    val deserializedOffsetResponse = OffsetResponse.readFrom(buffer)
-    assertEquals("The original and deserialzed offsetResponse should be the same", offsetResponse,
-                 deserializedOffsetResponse)
-
-    buffer = ByteBuffer.allocate(topicMetadataRequest.sizeInBytes())
-    topicMetadataRequest.writeTo(buffer)
-    buffer.rewind()
-    val deserializedTopicMetadataRequest = TopicMetadataRequest.readFrom(buffer)
-    assertEquals("The original and deserialzed topicMetadataRequest should be the same", topicMetadataRequest,
-                 deserializedTopicMetadataRequest)
-
-    buffer = ByteBuffer.allocate(topicMetadataResponse.sizeInBytes)
-    topicMetadataResponse.writeTo(buffer)
-    buffer.rewind()
-    val deserializedTopicMetadataResponse = TopicMetadataResponse.readFrom(buffer)
-    assertEquals("The original and deserialzed topicMetadataResponse should be the same", topicMetadataResponse,
-                 deserializedTopicMetadataResponse)
-
-    buffer = ByteBuffer.allocate(offsetCommitRequest.sizeInBytes)
-    offsetCommitRequest.writeTo(buffer)
-    buffer.rewind()
-    val deserializedOffsetCommitRequest = OffsetCommitRequest.readFrom(buffer)
-    assertEquals("The original and deserialzed offsetCommitRequest should be the same", offsetCommitRequest, 
-      deserializedOffsetCommitRequest)
-
-    buffer = ByteBuffer.allocate(offsetCommitResponse.sizeInBytes)
-    offsetCommitResponse.writeTo(buffer)
-    buffer.rewind()
-    val deserializedOffsetCommitResponse = OffsetCommitResponse.readFrom(buffer)
-    assertEquals("The original and deserialzed offsetCommitResponse should be the same", offsetCommitResponse, 
-      deserializedOffsetCommitResponse)
-
-    buffer = ByteBuffer.allocate(offsetFetchRequest.sizeInBytes)
-    offsetFetchRequest.writeTo(buffer)
-    buffer.rewind()
-    val deserializedOffsetFetchRequest = OffsetFetchRequest.readFrom(buffer)
-    assertEquals("The original and deserialzed offsetFetchRequest should be the same", offsetFetchRequest, 
-      deserializedOffsetFetchRequest)
-
-    buffer = ByteBuffer.allocate(offsetFetchResponse.sizeInBytes)
-    offsetFetchResponse.writeTo(buffer)
-    buffer.rewind()
-    val deserializedOffsetFetchResponse = OffsetFetchResponse.readFrom(buffer)
-    assertEquals("The original and deserialzed offsetFetchResponse should be the same", offsetFetchResponse, 
-      deserializedOffsetFetchResponse)
 
+    val requestsAndResponses =
+      collection.immutable.Seq(leaderAndIsrRequest, leaderAndIsrResponse,
+                               stopReplicaRequest, stopReplicaResponse,
+                               producerRequest, producerResponse,
+                               fetchRequest,
+                               offsetRequest, offsetResponse,
+                               topicMetadataRequest, topicMetadataResponse,
+                               offsetCommitRequest, offsetCommitResponse,
+                               offsetFetchRequest, offsetFetchResponse,
+                               consumerMetadataRequest, consumerMetadataResponse)
+
+    requestsAndResponses.foreach { original =>
+      val buffer = ByteBuffer.allocate(original.sizeInBytes)
+      original.writeTo(buffer)
+      buffer.rewind()
+      val deserializer = original.getClass.getDeclaredMethod("readFrom", classOf[ByteBuffer])
+      val deserialized = deserializer.invoke(null, buffer)
+      assertEquals("The original and deserialized request/response should be the same.", original, deserialized)
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
index cf2724b..d903a6f 100644
--- a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
@@ -21,6 +21,7 @@ package kafka.consumer
 import junit.framework.Assert._
 import org.scalatest.junit.JUnitSuite
 import org.junit.Test
+import kafka.server.OffsetManager
 
 
 class TopicFilterTest extends JUnitSuite {
@@ -29,19 +30,30 @@ class TopicFilterTest extends JUnitSuite {
   def testWhitelists() {
 
     val topicFilter1 = new Whitelist("white1,white2")
-    assertTrue(topicFilter1.isTopicAllowed("white2"))
-    assertFalse(topicFilter1.isTopicAllowed("black1"))
+    assertTrue(topicFilter1.isTopicAllowed("white2", excludeInternalTopics = true))
+    assertTrue(topicFilter1.isTopicAllowed("white2", excludeInternalTopics = false))
+    assertFalse(topicFilter1.isTopicAllowed("black1", excludeInternalTopics = true))
+    assertFalse(topicFilter1.isTopicAllowed("black1", excludeInternalTopics = false))
 
     val topicFilter2 = new Whitelist(".+")
-    assertTrue(topicFilter2.isTopicAllowed("alltopics"))
-    
+    assertTrue(topicFilter2.isTopicAllowed("alltopics", excludeInternalTopics = true))
+    assertFalse(topicFilter2.isTopicAllowed(OffsetManager.OffsetsTopicName, excludeInternalTopics = true))
+    assertTrue(topicFilter2.isTopicAllowed(OffsetManager.OffsetsTopicName, excludeInternalTopics = false))
+
     val topicFilter3 = new Whitelist("white_listed-topic.+")
-    assertTrue(topicFilter3.isTopicAllowed("white_listed-topic1"))
-    assertFalse(topicFilter3.isTopicAllowed("black1"))
+    assertTrue(topicFilter3.isTopicAllowed("white_listed-topic1", excludeInternalTopics = true))
+    assertFalse(topicFilter3.isTopicAllowed("black1", excludeInternalTopics = true))
   }
 
   @Test
   def testBlacklists() {
     val topicFilter1 = new Blacklist("black1")
+    assertTrue(topicFilter1.isTopicAllowed("white2", excludeInternalTopics = true))
+    assertTrue(topicFilter1.isTopicAllowed("white2", excludeInternalTopics = false))
+    assertFalse(topicFilter1.isTopicAllowed("black1", excludeInternalTopics = true))
+    assertFalse(topicFilter1.isTopicAllowed("black1", excludeInternalTopics = false))
+
+    assertFalse(topicFilter1.isTopicAllowed(OffsetManager.OffsetsTopicName, excludeInternalTopics = true))
+    assertTrue(topicFilter1.isTopicAllowed(OffsetManager.OffsetsTopicName, excludeInternalTopics = false))
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
index 8fe7259..258dd25 100644
--- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -115,7 +115,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     assertEquals(expected_1, actual_1)
 
     // commit consumed offsets
-    zkConsumerConnector1.commitOffsets
+    zkConsumerConnector1.commitOffsets()
 
     // create a consumer
     val consumerConfig2 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer2)) {
@@ -194,7 +194,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
     assertEquals(expected_1, actual_1)
 
     // commit consumed offsets
-    zkConsumerConnector1.commitOffsets
+    zkConsumerConnector1.commitOffsets()
 
     // create a consumer
     val consumerConfig2 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer2)) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
index 6a96d80..e632997 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
@@ -23,13 +23,13 @@ import junit.framework.Assert._
 import java.util.Properties
 import kafka.consumer.SimpleConsumer
 import org.junit.{After, Before, Test}
-import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
 import kafka.zk.ZooKeeperTestHarness
 import org.scalatest.junit.JUnit3Suite
-import kafka.api.{OffsetCommitRequest, OffsetFetchRequest}
+import kafka.api.{ConsumerMetadataRequest, OffsetCommitRequest, OffsetFetchRequest}
 import kafka.utils.TestUtils._
-import kafka.common.{ErrorMapping, TopicAndPartition, OffsetMetadataAndError}
+import kafka.common.{OffsetMetadataAndError, OffsetAndMetadata, ErrorMapping, TopicAndPartition}
 import scala.util.Random
+import scala.collection._
 import kafka.admin.AdminUtils
 
 class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
@@ -39,6 +39,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
   var server: KafkaServer = null
   var logSize: Int = 100
   val brokerPort: Int = 9099
+  val group = "test-group"
   var simpleConsumer: SimpleConsumer = null
   var time: Time = new MockTime()
 
@@ -51,6 +52,14 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
     time = new MockTime()
     server = TestUtils.createServer(new KafkaConfig(config), time)
     simpleConsumer = new SimpleConsumer("localhost", brokerPort, 1000000, 64*1024, "test-client")
+    val consumerMetadataRequest = ConsumerMetadataRequest(group)
+    Stream.continually {
+      val consumerMetadataResponse = simpleConsumer.send(consumerMetadataRequest)
+      consumerMetadataResponse.coordinator.isDefined
+    }.dropWhile(success => {
+      if (!success) Thread.sleep(1000)
+      !success
+    })
   }
 
   @After
@@ -72,34 +81,34 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
     AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
     val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000)
     assertTrue("Leader should be elected after topic creation", leaderIdOpt.isDefined)
-    val commitRequest = OffsetCommitRequest("test-group", Map(topicAndPartition -> OffsetMetadataAndError(offset=42L)))
+    val commitRequest = OffsetCommitRequest("test-group", mutable.Map(topicAndPartition -> OffsetAndMetadata(offset=42L)))
     val commitResponse = simpleConsumer.commitOffsets(commitRequest)
 
-    assertEquals(ErrorMapping.NoError, commitResponse.requestInfo.get(topicAndPartition).get)
+    assertEquals(ErrorMapping.NoError, commitResponse.commitStatus.get(topicAndPartition).get)
 
     // Fetch it and verify
-    val fetchRequest = OffsetFetchRequest("test-group", Seq(topicAndPartition))
+    val fetchRequest = OffsetFetchRequest(group, Seq(topicAndPartition))
     val fetchResponse = simpleConsumer.fetchOffsets(fetchRequest)
 
     assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(topicAndPartition).get.error)
-    //assertEquals(OffsetMetadataAndError.NoMetadata, fetchResponse.requestInfo.get(topicAndPartition).get.metadata)
+    assertEquals(OffsetAndMetadata.NoMetadata, fetchResponse.requestInfo.get(topicAndPartition).get.metadata)
     assertEquals(42L, fetchResponse.requestInfo.get(topicAndPartition).get.offset)
 
     // Commit a new offset
-    val commitRequest1 = OffsetCommitRequest("test-group", Map(topicAndPartition -> OffsetMetadataAndError(
+    val commitRequest1 = OffsetCommitRequest(group, mutable.Map(topicAndPartition -> OffsetAndMetadata(
       offset=100L,
       metadata="some metadata"
     )))
     val commitResponse1 = simpleConsumer.commitOffsets(commitRequest1)
 
-    assertEquals(ErrorMapping.NoError, commitResponse1.requestInfo.get(topicAndPartition).get)
+    assertEquals(ErrorMapping.NoError, commitResponse1.commitStatus.get(topicAndPartition).get)
 
     // Fetch it and verify
-    val fetchRequest1 = OffsetFetchRequest("test-group", Seq(topicAndPartition))
+    val fetchRequest1 = OffsetFetchRequest(group, Seq(topicAndPartition))
     val fetchResponse1 = simpleConsumer.fetchOffsets(fetchRequest1)
-    
+
     assertEquals(ErrorMapping.NoError, fetchResponse1.requestInfo.get(topicAndPartition).get.error)
-    //assertEquals("some metadata", fetchResponse1.requestInfo.get(topicAndPartition).get.metadata)
+    assertEquals("some metadata", fetchResponse1.requestInfo.get(topicAndPartition).get.metadata)
     assertEquals(100L, fetchResponse1.requestInfo.get(topicAndPartition).get.offset)
 
   }
@@ -111,34 +120,19 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
     val topic3 = "topic-3"
     val topic4 = "topic-4"
 
-    val expectedReplicaAssignment = Map(0  -> List(1))
-    // create the topic
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic1, expectedReplicaAssignment)
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic2, expectedReplicaAssignment)
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic3, expectedReplicaAssignment)
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic4, expectedReplicaAssignment)
-    var leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 0, 1000)
-    assertTrue("Leader should be elected after topic creation", leaderIdOpt.isDefined)
-    leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic2, 0, 1000)
-    assertTrue("Leader should be elected after topic creation", leaderIdOpt.isDefined)
-    leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic3, 0, 1000)
-    assertTrue("Leader should be elected after topic creation", leaderIdOpt.isDefined)
-    leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic4, 0, 1000)
-    assertTrue("Leader should be elected after topic creation", leaderIdOpt.isDefined)
-
-    val commitRequest = OffsetCommitRequest("test-group", Map(
-      TopicAndPartition(topic1, 0) -> OffsetMetadataAndError(offset=42L, metadata="metadata one"),
-      TopicAndPartition(topic2, 0) -> OffsetMetadataAndError(offset=43L, metadata="metadata two"),
-      TopicAndPartition(topic3, 0) -> OffsetMetadataAndError(offset=44L, metadata="metadata three"),
-      TopicAndPartition(topic2, 1) -> OffsetMetadataAndError(offset=45L)
+    val commitRequest = OffsetCommitRequest("test-group", mutable.Map(
+      TopicAndPartition(topic1, 0) -> OffsetAndMetadata(offset=42L, metadata="metadata one"),
+      TopicAndPartition(topic2, 0) -> OffsetAndMetadata(offset=43L, metadata="metadata two"),
+      TopicAndPartition(topic3, 0) -> OffsetAndMetadata(offset=44L, metadata="metadata three"),
+      TopicAndPartition(topic2, 1) -> OffsetAndMetadata(offset=45L)
     ))
     val commitResponse = simpleConsumer.commitOffsets(commitRequest)
-    assertEquals(ErrorMapping.NoError, commitResponse.requestInfo.get(TopicAndPartition(topic1, 0)).get)
-    assertEquals(ErrorMapping.NoError, commitResponse.requestInfo.get(TopicAndPartition(topic2, 0)).get)
-    assertEquals(ErrorMapping.NoError, commitResponse.requestInfo.get(TopicAndPartition(topic3, 0)).get)
-    assertEquals(ErrorMapping.NoError, commitResponse.requestInfo.get(TopicAndPartition(topic2, 1)).get)
+    assertEquals(ErrorMapping.NoError, commitResponse.commitStatus.get(TopicAndPartition(topic1, 0)).get)
+    assertEquals(ErrorMapping.NoError, commitResponse.commitStatus.get(TopicAndPartition(topic2, 0)).get)
+    assertEquals(ErrorMapping.NoError, commitResponse.commitStatus.get(TopicAndPartition(topic3, 0)).get)
+    assertEquals(ErrorMapping.NoError, commitResponse.commitStatus.get(TopicAndPartition(topic2, 1)).get)
 
-    val fetchRequest = OffsetFetchRequest("test-group", Seq(
+    val fetchRequest = OffsetFetchRequest(group, Seq(
       TopicAndPartition(topic1, 0),
       TopicAndPartition(topic2, 0),
       TopicAndPartition(topic3, 0),
@@ -152,22 +146,22 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
     assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 0)).get.error)
     assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 0)).get.error)
     assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 1)).get.error)
-    assertEquals(ErrorMapping.UnknownTopicOrPartitionCode, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get.error)
-    assertEquals(ErrorMapping.UnknownTopicOrPartitionCode, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get.error)
+    assertEquals(OffsetMetadataAndError.NoOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get)
+    assertEquals(OffsetMetadataAndError.NoOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get)
 
-    //assertEquals("metadata one", fetchResponse.requestInfo.get(TopicAndPartition(topic1, 0)).get.metadata)
-    //assertEquals("metadata two", fetchResponse.requestInfo.get(TopicAndPartition(topic2, 0)).get.metadata)
-    //assertEquals("metadata three", fetchResponse.requestInfo.get(TopicAndPartition(topic3, 0)).get.metadata)
-    //assertEquals(OffsetMetadataAndError.NoMetadata, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 1)).get.metadata)
-    //assertEquals(OffsetMetadataAndError.NoMetadata, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get.metadata)
-    //assertEquals(OffsetMetadataAndError.NoMetadata, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get.metadata)
+    assertEquals("metadata one", fetchResponse.requestInfo.get(TopicAndPartition(topic1, 0)).get.metadata)
+    assertEquals("metadata two", fetchResponse.requestInfo.get(TopicAndPartition(topic2, 0)).get.metadata)
+    assertEquals("metadata three", fetchResponse.requestInfo.get(TopicAndPartition(topic3, 0)).get.metadata)
+    assertEquals(OffsetAndMetadata.NoMetadata, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 1)).get.metadata)
+    assertEquals(OffsetAndMetadata.NoMetadata, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get.metadata)
+    assertEquals(OffsetAndMetadata.NoMetadata, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get.metadata)
 
     assertEquals(42L, fetchResponse.requestInfo.get(TopicAndPartition(topic1, 0)).get.offset)
     assertEquals(43L, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 0)).get.offset)
     assertEquals(44L, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 0)).get.offset)
     assertEquals(45L, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 1)).get.offset)
-    assertEquals(OffsetMetadataAndError.InvalidOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get.offset)
-    assertEquals(OffsetMetadataAndError.InvalidOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get.offset)
+    assertEquals(OffsetAndMetadata.InvalidOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get.offset)
+    assertEquals(OffsetAndMetadata.InvalidOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get.offset)
   }
 
   @Test
@@ -178,36 +172,21 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
     var leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topicAndPartition.topic, 0, 1000)
     assertTrue("Leader should be elected after topic creation", leaderIdOpt.isDefined)
 
-    val commitRequest = OffsetCommitRequest("test-group", Map(topicAndPartition -> OffsetMetadataAndError(
+    val commitRequest = OffsetCommitRequest("test-group", mutable.Map(topicAndPartition -> OffsetAndMetadata(
       offset=42L,
       metadata=random.nextString(server.config.offsetMetadataMaxSize)
     )))
     val commitResponse = simpleConsumer.commitOffsets(commitRequest)
 
-    assertEquals(ErrorMapping.NoError, commitResponse.requestInfo.get(topicAndPartition).get)
+    assertEquals(ErrorMapping.NoError, commitResponse.commitStatus.get(topicAndPartition).get)
 
-    val commitRequest1 = OffsetCommitRequest("test-group", Map(topicAndPartition -> OffsetMetadataAndError(
+    val commitRequest1 = OffsetCommitRequest(group, mutable.Map(topicAndPartition -> OffsetAndMetadata(
       offset=42L,
       metadata=random.nextString(server.config.offsetMetadataMaxSize + 1)
     )))
     val commitResponse1 = simpleConsumer.commitOffsets(commitRequest1)
 
-    assertEquals(ErrorMapping.OffsetMetadataTooLargeCode, commitResponse1.requestInfo.get(topicAndPartition).get)
-
-  }
+    assertEquals(ErrorMapping.OffsetMetadataTooLargeCode, commitResponse1.commitStatus.get(topicAndPartition).get)
 
-  @Test
-  def testNullMetadata() {
-    val topicAndPartition = TopicAndPartition("null-metadata", 0)
-    val expectedReplicaAssignment = Map(0  -> List(1))
-    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topicAndPartition.topic, expectedReplicaAssignment)
-    var leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topicAndPartition.topic, 0, 1000)
-    assertTrue("Leader should be elected after topic creation", leaderIdOpt.isDefined)
-    val commitRequest = OffsetCommitRequest("test-group", Map(topicAndPartition -> OffsetMetadataAndError(
-      offset=42L,
-      metadata=null
-    )))
-    val commitResponse = simpleConsumer.commitOffsets(commitRequest)
-    assertEquals(ErrorMapping.NoError, commitResponse.requestInfo.get(topicAndPartition).get)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index 1317b4c..22bb6f2 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -86,12 +86,15 @@ class SimpleFetchTest extends JUnit3Suite {
     EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, partitionId)).andReturn(partition.leaderReplicaIfLocal().get).anyTimes()
     EasyMock.replay(replicaManager)
 
+    val offsetManager = EasyMock.createMock(classOf[kafka.server.OffsetManager])
+
     val controller = EasyMock.createMock(classOf[kafka.controller.KafkaController])
 
     // start a request channel with 2 processors and a queue size of 5 (this is more or less arbitrary)
     // don't provide replica or leader callbacks since they will not be tested here
     val requestChannel = new RequestChannel(2, 5)
-    val apis = new KafkaApis(requestChannel, replicaManager, zkClient, configs.head.brokerId, configs.head, controller)
+    val apis = new KafkaApis(requestChannel, replicaManager, offsetManager, zkClient, configs.head.brokerId, configs.head, controller)
+
     val partitionStateInfo = EasyMock.createNiceMock(classOf[PartitionStateInfo])
     apis.metadataCache.put(TopicAndPartition(topic, partitionId), partitionStateInfo)
     EasyMock.replay(partitionStateInfo)
@@ -159,10 +162,12 @@ class SimpleFetchTest extends JUnit3Suite {
     EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, partitionId)).andReturn(partition.leaderReplicaIfLocal().get).anyTimes()
     EasyMock.replay(replicaManager)
 
+    val offsetManager = EasyMock.createMock(classOf[kafka.server.OffsetManager])
+
     val controller = EasyMock.createMock(classOf[kafka.controller.KafkaController])
 
     val requestChannel = new RequestChannel(2, 5)
-    val apis = new KafkaApis(requestChannel, replicaManager, zkClient, configs.head.brokerId, configs.head, controller)
+    val apis = new KafkaApis(requestChannel, replicaManager, offsetManager, zkClient, configs.head.brokerId, configs.head, controller)
     val partitionStateInfo = EasyMock.createNiceMock(classOf[PartitionStateInfo])
     apis.metadataCache.put(TopicAndPartition(topic, partitionId), partitionStateInfo)
     EasyMock.replay(partitionStateInfo)

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/migration_tool_testsuite/migration_tool_test.py
----------------------------------------------------------------------
diff --git a/system_test/migration_tool_testsuite/migration_tool_test.py b/system_test/migration_tool_testsuite/migration_tool_test.py
index 2fecd19..2386a58 100644
--- a/system_test/migration_tool_testsuite/migration_tool_test.py
+++ b/system_test/migration_tool_testsuite/migration_tool_test.py
@@ -171,7 +171,7 @@ class MigrationToolTest(ReplicationUtils, SetupUtils):
                 time.sleep(5)
 
                 self.log_message("creating topics")
-                kafka_system_test_utils.create_topic(self.systemTestEnv, self.testcaseEnv)
+                kafka_system_test_utils.create_topic_for_producer_performance(self.systemTestEnv, self.testcaseEnv)
                 self.anonLogger.info("sleeping for 5s")
                 time.sleep(5)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/mirror_maker/README
----------------------------------------------------------------------
diff --git a/system_test/mirror_maker/README b/system_test/mirror_maker/README
deleted file mode 100644
index da53c14..0000000
--- a/system_test/mirror_maker/README
+++ /dev/null
@@ -1,22 +0,0 @@
-This test replicates messages from two source kafka clusters into one target
-kafka cluster using the mirror-maker tool.  At the end, the messages produced
-at the source brokers should match that at the target brokers.
-
-To run this test, do
-bin/run-test.sh
-
-In the event of failure, by default the brokers and zookeepers remain running
-to make it easier to debug the issue - hit Ctrl-C to shut them down. You can
-change this behavior by setting the action_on_fail flag in the script to "exit"
-or "proceed", in which case a snapshot of all the logs and directories is
-placed in the test's base directory.
-
-It is a good idea to run the test in a loop. E.g.:
-
-:>/tmp/mirrormaker_test.log
-for i in {1..10}; do echo "run $i"; ./bin/run-test.sh 2>1 >> /tmp/mirrormaker_test.log; done
-tail -F /tmp/mirrormaker_test.log
-
-grep -ic passed /tmp/mirrormaker_test.log
-grep -ic failed /tmp/mirrormaker_test.log
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/mirror_maker/bin/expected.out
----------------------------------------------------------------------
diff --git a/system_test/mirror_maker/bin/expected.out b/system_test/mirror_maker/bin/expected.out
deleted file mode 100644
index 0a1bbaf..0000000
--- a/system_test/mirror_maker/bin/expected.out
+++ /dev/null
@@ -1,18 +0,0 @@
-start the servers ...
-start producing messages ...
-wait for consumer to finish consuming ...
-[2011-05-17 14:49:11,605] INFO Creating async producer for broker id = 2 at localhost:9091 (kafka.producer.ProducerPool)
-[2011-05-17 14:49:11,606] INFO Creating async producer for broker id = 1 at localhost:9092 (kafka.producer.ProducerPool)
-[2011-05-17 14:49:11,607] INFO Creating async producer for broker id = 3 at localhost:9090 (kafka.producer.ProducerPool)
-thread 0: 400000 messages sent 3514012.1233 nMsg/sec 3.3453 MBs/sec
-[2011-05-17 14:49:34,382] INFO Closing all async producers (kafka.producer.ProducerPool)
-[2011-05-17 14:49:34,383] INFO Closed AsyncProducer (kafka.producer.async.AsyncProducer)
-[2011-05-17 14:49:34,384] INFO Closed AsyncProducer (kafka.producer.async.AsyncProducer)
-[2011-05-17 14:49:34,385] INFO Closed AsyncProducer (kafka.producer.async.AsyncProducer)
-Total Num Messages: 400000 bytes: 79859641 in 22.93 secs
-Messages/sec: 17444.3960
-MB/sec: 3.3214
-test passed
-stopping the servers
-bin/../../../bin/zookeeper-server-start.sh: line 9: 22584 Terminated              $(dirname $0)/kafka-run-class.sh org.apache.zookeeper.server.quorum.QuorumPeerMain $@
-bin/../../../bin/zookeeper-server-start.sh: line 9: 22585 Terminated              $(dirname $0)/kafka-run-class.sh org.apache.zookeeper.server.quorum.QuorumPeerMain $@

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/mirror_maker/bin/run-test.sh
----------------------------------------------------------------------
diff --git a/system_test/mirror_maker/bin/run-test.sh b/system_test/mirror_maker/bin/run-test.sh
deleted file mode 100644
index e5e6c08..0000000
--- a/system_test/mirror_maker/bin/run-test.sh
+++ /dev/null
@@ -1,357 +0,0 @@
-#!/bin/bash
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-# 
-#    http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-readonly num_messages=10000
-readonly message_size=100
-readonly action_on_fail="proceed"
-# readonly action_on_fail="wait"
-
-readonly test_start_time="$(date +%s)"
-
-readonly base_dir=$(dirname $0)/..
-
-info() {
-    echo -e "$(date +"%Y-%m-%d %H:%M:%S") $*"
-}
-
-kill_child_processes() {
-    isTopmost=$1
-    curPid=$2
-    childPids=$(ps a -o pid= -o ppid= | grep "${curPid}$" | awk '{print $1;}')
-    for childPid in $childPids
-    do
-        kill_child_processes 0 $childPid
-    done
-    if [ $isTopmost -eq 0 ]; then
-        kill -15 $curPid 2> /dev/null
-    fi
-}
-
-cleanup() {
-    info "cleaning up"
-
-    pid_zk_source1=
-    pid_zk_source2=
-    pid_zk_target=
-    pid_kafka_source_1_1=
-    pid_kafka_source_1_2=
-    pid_kafka_source_2_1=
-    pid_kafka_source_2_2=
-    pid_kafka_target_1_1=
-    pid_kafka_target_1_2=
-    pid_producer=
-    pid_mirrormaker_1=
-    pid_mirrormaker_2=
-
-    rm -rf /tmp/zookeeper*
-
-    rm -rf /tmp/kafka*
-}
-
-begin_timer() {
-    t_begin=$(date +%s)
-}
-
-end_timer() {
-    t_end=$(date +%s)
-}
-
-start_zk() {
-    info "starting zookeepers"
-    $base_dir/../../bin/zookeeper-server-start.sh $base_dir/config/zookeeper_source_1.properties 2>&1 > $base_dir/zookeeper_source-1.log &
-    pid_zk_source1=$!
-    $base_dir/../../bin/zookeeper-server-start.sh $base_dir/config/zookeeper_source_2.properties 2>&1 > $base_dir/zookeeper_source-2.log &
-    pid_zk_source2=$!
-    $base_dir/../../bin/zookeeper-server-start.sh $base_dir/config/zookeeper_target.properties 2>&1 > $base_dir/zookeeper_target.log &
-    pid_zk_target=$!
-}
-
-start_source_servers() {
-    info "starting source cluster"
-
-    JMX_PORT=1111 $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source_1_1.properties 2>&1 > $base_dir/kafka_source-1-1.log &
-    pid_kafka_source_1_1=$!
-    JMX_PORT=2222 $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source_1_2.properties 2>&1 > $base_dir/kafka_source-1-2.log &
-    pid_kafka_source_1_2=$!
-    JMX_PORT=3333 $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source_2_1.properties 2>&1 > $base_dir/kafka_source-2-1.log &
-    pid_kafka_source_2_1=$!
-    JMX_PORT=4444 $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source_2_2.properties 2>&1 > $base_dir/kafka_source-2-2.log &
-    pid_kafka_source_2_2=$!
-}
-
-start_target_servers() {
-    info "starting mirror cluster"
-    JMX_PORT=5555 $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_target_1_1.properties 2>&1 > $base_dir/kafka_target-1-1.log &
-    pid_kafka_target_1_1=$!
-    JMX_PORT=6666 $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_target_1_2.properties 2>&1 > $base_dir/kafka_target-1-2.log &
-    pid_kafka_target_1_2=$!
-}
-
-shutdown_servers() {
-    info "stopping mirror-maker"
-    if [ "x${pid_mirrormaker_1}" != "x" ]; then kill_child_processes 0 ${pid_mirrormaker_1}; fi
-    # sleep to avoid rebalancing during shutdown
-    sleep 2
-    if [ "x${pid_mirrormaker_2}" != "x" ]; then kill_child_processes 0 ${pid_mirrormaker_2}; fi
-
-    info "stopping producer"
-    if [ "x${pid_producer}" != "x" ]; then kill_child_processes 0 ${pid_producer}; fi
-
-    info "shutting down target servers"
-    if [ "x${pid_kafka_target_1_1}" != "x" ]; then kill_child_processes 0 ${pid_kafka_target_1_1}; fi
-    if [ "x${pid_kafka_target_1_2}" != "x" ]; then kill_child_processes 0 ${pid_kafka_target_1_2}; fi
-    sleep 2
-
-    info "shutting down source servers"
-    if [ "x${pid_kafka_source_1_1}" != "x" ]; then kill_child_processes 0 ${pid_kafka_source_1_1}; fi
-    if [ "x${pid_kafka_source_1_2}" != "x" ]; then kill_child_processes 0 ${pid_kafka_source_1_2}; fi
-    if [ "x${pid_kafka_source_2_1}" != "x" ]; then kill_child_processes 0 ${pid_kafka_source_2_1}; fi
-    if [ "x${pid_kafka_source_2_2}" != "x" ]; then kill_child_processes 0 ${pid_kafka_source_2_2}; fi
-
-    info "shutting down zookeeper servers"
-    if [ "x${pid_zk_target}" != "x" ]; then kill_child_processes 0 ${pid_zk_target}; fi
-    if [ "x${pid_zk_source1}" != "x" ]; then kill_child_processes 0 ${pid_zk_source1}; fi
-    if [ "x${pid_zk_source2}" != "x" ]; then kill_child_processes 0 ${pid_zk_source2}; fi
-}
-
-start_producer() {
-    topic=$1
-    zk=$2
-    info "start producing messages for topic $topic to zookeeper $zk ..."
-    $base_dir/../../bin/kafka-run-class.sh kafka.perf.ProducerPerformance --brokerinfo zk.connect=$zk --topics $topic --messages $num_messages --message-size $message_size --batch-size 200 --vary-message-size --threads 1 --reporting-interval $num_messages --async 2>&1 > $base_dir/producer_performance.log &
-    pid_producer=$!
-}
-
-# Usage: wait_partition_done ([kafka-server] [topic] [partition-id])+
-wait_partition_done() {
-    n_tuples=$(($# / 3))
-
-    i=1
-    while (($#)); do
-        kafka_server[i]=$1
-        topic[i]=$2
-        partitionid[i]=$3
-        prev_offset[i]=0
-        info "\twaiting for partition on server ${kafka_server[i]}, topic ${topic[i]}, partition ${partitionid[i]}"
-        i=$((i+1))
-        shift 3
-    done
-
-    all_done=0
-
-    # set -x
-    while [[ $all_done != 1 ]]; do
-        sleep 4
-        i=$n_tuples
-        all_done=1
-        for ((i=1; i <= $n_tuples; i++)); do
-            cur_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server ${kafka_server[i]} --topic ${topic[i]} --partition ${partitionid[i]} --time -1 --offsets 1 | tail -1)
-            if [ "x$cur_size" != "x${prev_offset[i]}" ]; then
-                all_done=0
-                prev_offset[i]=$cur_size
-            fi
-        done
-    done
-
-}
-
-cmp_logs() {
-    topic=$1
-    info "comparing source and target logs for topic $topic"
-    source_part0_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9090 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1)
-    source_part1_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9091 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1)
-    source_part2_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9092 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1)
-    source_part3_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9093 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1)
-    target_part0_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9094 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1)
-    target_part1_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9095 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1)
-    if [ "x$source_part0_size" == "x" ]; then source_part0_size=0; fi
-    if [ "x$source_part1_size" == "x" ]; then source_part1_size=0; fi
-    if [ "x$source_part2_size" == "x" ]; then source_part2_size=0; fi
-    if [ "x$source_part3_size" == "x" ]; then source_part3_size=0; fi
-    if [ "x$target_part0_size" == "x" ]; then target_part0_size=0; fi
-    if [ "x$target_part1_size" == "x" ]; then target_part1_size=0; fi
-    expected_size=$(($source_part0_size + $source_part1_size + $source_part2_size + $source_part3_size))
-    actual_size=$(($target_part0_size + $target_part1_size))
-    if [ "x$expected_size" != "x$actual_size" ]
-    then
-        info "source size: $expected_size target size: $actual_size"
-        return 1
-    else
-        return 0
-    fi
-}
-
-take_fail_snapshot() {
-    snapshot_dir="$base_dir/failed-${snapshot_prefix}-${test_start_time}"
-    mkdir $snapshot_dir
-    for dir in /tmp/zookeeper_source{1..2} /tmp/zookeeper_target /tmp/kafka-source-{1..2}-{1..2}-logs /tmp/kafka-target{1..2}-logs; do
-        if [ -d $dir ]; then
-            cp -r $dir $snapshot_dir
-        fi
-    done
-}
-
-# Usage: process_test_result <result> <action_on_fail>
-# result: last test result
-# action_on_fail: (exit|wait|proceed)
-# ("wait" is useful if you want to troubleshoot using zookeeper)
-process_test_result() {
-    result=$1
-    if [ $1 -eq 0 ]; then
-        info "test passed"
-    else
-        info "test failed"
-        case "$2" in
-            "wait") info "waiting: hit Ctrl-c to quit"
-                wait
-                ;;
-            "exit") shutdown_servers
-                take_fail_snapshot
-                exit $result
-                ;;
-            *) shutdown_servers
-                take_fail_snapshot
-                info "proceeding"
-                ;;
-        esac
-    fi
-}
-
-test_whitelists() {
-    info "### Testing whitelists"
-    snapshot_prefix="whitelist-test"
-
-    cleanup
-    start_zk
-    start_source_servers
-    start_target_servers
-    sleep 4
-
-    info "starting mirror makers"
-    JMX_PORT=7777 $base_dir/../../bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config $base_dir/config/whitelisttest_1.consumer.properties --consumer.config $base_dir/config/whitelisttest_2.consumer.properties --producer.config $base_dir/config/mirror_producer.properties --whitelist="white.*" --num.streams 2 2>&1 > $base_dir/kafka_mirrormaker_1.log &
-    pid_mirrormaker_1=$!
-    JMX_PORT=8888 $base_dir/../../bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config $base_dir/config/whitelisttest_1.consumer.properties --consumer.config $base_dir/config/whitelisttest_2.consumer.properties --producer.config $base_dir/config/mirror_producer.properties --whitelist="white.*" --num.streams 2 2>&1 > $base_dir/kafka_mirrormaker_2.log &
-    pid_mirrormaker_2=$!
-
-    begin_timer
-
-    start_producer whitetopic01 localhost:2181
-    start_producer whitetopic01 localhost:2182
-    info "waiting for whitetopic01 producers to finish producing ..."
-    wait_partition_done kafka://localhost:9090 whitetopic01 0 kafka://localhost:9091 whitetopic01 0 kafka://localhost:9092 whitetopic01 0 kafka://localhost:9093 whitetopic01 0
-
-    start_producer whitetopic02 localhost:2181
-    start_producer whitetopic03 localhost:2181
-    start_producer whitetopic04 localhost:2182
-    info "waiting for whitetopic02,whitetopic03,whitetopic04 producers to finish producing ..."
-    wait_partition_done kafka://localhost:9090 whitetopic02 0 kafka://localhost:9091 whitetopic02 0 kafka://localhost:9090 whitetopic03 0 kafka://localhost:9091 whitetopic03 0 kafka://localhost:9092 whitetopic04 0 kafka://localhost:9093 whitetopic04 0
-
-    start_producer blacktopic01 localhost:2182
-    info "waiting for blacktopic01 producer to finish producing ..."
-    wait_partition_done kafka://localhost:9092 blacktopic01 0 kafka://localhost:9093 blacktopic01 0
-
-    info "waiting for consumer to finish consuming ..."
-
-    wait_partition_done kafka://localhost:9094 whitetopic01 0 kafka://localhost:9095 whitetopic01 0 kafka://localhost:9094 whitetopic02 0 kafka://localhost:9095 whitetopic02 0 kafka://localhost:9094 whitetopic03 0 kafka://localhost:9095 whitetopic03 0 kafka://localhost:9094 whitetopic04 0 kafka://localhost:9095 whitetopic04 0
-
-    end_timer
-    info "embedded consumer took $((t_end - t_begin)) seconds"
-
-    sleep 2
-
-    # if [[ -d /tmp/kafka-target-1-1-logs/blacktopic01 || /tmp/kafka-target-1-2-logs/blacktopic01 ]]; then
-    #     echo "blacktopic01 found on target cluster"
-    #     result=1
-    # else
-    #     cmp_logs whitetopic01 && cmp_logs whitetopic02 && cmp_logs whitetopic03 && cmp_logs whitetopic04
-    #     result=$?
-    # fi
-
-    cmp_logs blacktopic01
-
-    cmp_logs whitetopic01 && cmp_logs whitetopic02 && cmp_logs whitetopic03 && cmp_logs whitetopic04
-    result=$?
-
-    return $result
-}
-
-test_blacklists() {
-    info "### Testing blacklists"
-    snapshot_prefix="blacklist-test"
-    cleanup
-    start_zk
-    start_source_servers
-    start_target_servers
-    sleep 4
-
-    info "starting mirror maker"
-    $base_dir/../../bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config $base_dir/config/blacklisttest.consumer.properties --producer.config $base_dir/config/mirror_producer.properties --blacklist="black.*" --num.streams 2 2>&1 > $base_dir/kafka_mirrormaker_1.log &
-    pid_mirrormaker_1=$!
-
-    start_producer blacktopic01 localhost:2181
-    start_producer blacktopic02 localhost:2181
-    info "waiting for producer to finish producing blacktopic01,blacktopic02 ..."
-    wait_partition_done kafka://localhost:9090 blacktopic01 0 kafka://localhost:9091 blacktopic01 0 kafka://localhost:9090 blacktopic02 0 kafka://localhost:9091 blacktopic02 0
-
-    begin_timer
-
-    start_producer whitetopic01 localhost:2181
-    info "waiting for producer to finish producing whitetopic01 ..."
-    wait_partition_done kafka://localhost:9090 whitetopic01 0 kafka://localhost:9091 whitetopic01 0
-
-    info "waiting for consumer to finish consuming ..."
-    wait_partition_done kafka://localhost:9094 whitetopic01 0 kafka://localhost:9095 whitetopic01 0
-
-    end_timer
-
-    info "embedded consumer took $((t_end - t_begin)) seconds"
-
-    sleep 2
-
-    cmp_logs blacktopic01 || cmp_logs blacktopic02
-    if [ $? -eq 0 ]; then
-        return 1
-    fi
-    
-    cmp_logs whitetopic01
-    return $?
-}
-
-# main test begins
-
-echo "Test-$test_start_time"
-
-# Ctrl-c trap. Catches INT signal
-trap "shutdown_servers; exit 0" INT
-
-test_whitelists
-result=$?
-
-process_test_result $result $action_on_fail
-
-shutdown_servers
- 
-sleep 2
- 
-test_blacklists
-result=$?
-
-process_test_result $result $action_on_fail
-
-shutdown_servers
-
-exit $result
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/mirror_maker/config/blacklisttest.consumer.properties
----------------------------------------------------------------------
diff --git a/system_test/mirror_maker/config/blacklisttest.consumer.properties b/system_test/mirror_maker/config/blacklisttest.consumer.properties
deleted file mode 100644
index ff12015..0000000
--- a/system_test/mirror_maker/config/blacklisttest.consumer.properties
+++ /dev/null
@@ -1,28 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-# 
-#    http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# see kafka.consumer.ConsumerConfig for more details
-
-# zk connection string
-# comma separated host:port pairs, each corresponding to a zk
-# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
-zk.connect=localhost:2181
-
-# timeout in ms for connecting to zookeeper
-zk.connection.timeout.ms=1000000
-
-#consumer group id
-group.id=group1
-shallow.iterator.enable=true
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/mirror_maker/config/mirror_producer.properties
----------------------------------------------------------------------
diff --git a/system_test/mirror_maker/config/mirror_producer.properties b/system_test/mirror_maker/config/mirror_producer.properties
deleted file mode 100644
index aa8be65..0000000
--- a/system_test/mirror_maker/config/mirror_producer.properties
+++ /dev/null
@@ -1,30 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-# 
-#    http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# zk connection string
-# comma separated host:port pairs, each corresponding to a zk
-# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
-zk.connect=localhost:2183
-# broker.list=1:localhost:9094,2:localhost:9095
-
-# timeout in ms for connecting to zookeeper
-# zk.connection.timeout.ms=1000000
-
-producer.type=async
-
-# to avoid dropping events if the queue is full, wait indefinitely
-queue.enqueue.timeout.ms=-1
-
-num.producers.per.broker=2
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/mirror_maker/config/server_source_1_1.properties
----------------------------------------------------------------------
diff --git a/system_test/mirror_maker/config/server_source_1_1.properties b/system_test/mirror_maker/config/server_source_1_1.properties
deleted file mode 100644
index 2f070a7..0000000
--- a/system_test/mirror_maker/config/server_source_1_1.properties
+++ /dev/null
@@ -1,76 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-# 
-#    http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# see kafka.server.KafkaConfig for additional details and defaults
-
-# the id of the broker
-broker.id=1
-
-# hostname of broker. If not set, will pick up from the value returned
-# from getLocalHost.  If there are multiple interfaces getLocalHost
-# may not be what you want.
-# host.name=
-
-# number of logical partitions on this broker
-num.partitions=1
-
-# the port the socket server runs on
-port=9090
-
-# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
-num.threads=8
-
-# the directory in which to store log files
-log.dir=/tmp/kafka-source-1-1-logs
-
-# the send buffer used by the socket server 
-socket.send.buffer.bytes=1048576
-
-# the receive buffer used by the socket server
-socket.receive.buffer.bytes=1048576
-
-# the maximum size of a log segment
-log.segment.bytes=10000000
-
-# the interval between running cleanup on the logs
-log.cleanup.interval.mins=1
-
-# the minimum age of a log file to eligible for deletion
-log.retention.hours=168
-
-#the number of messages to accept without flushing the log to disk
-log.flush.interval.messages=600
-
-#set the following properties to use zookeeper
-
-# enable connecting to zookeeper
-enable.zookeeper=true
-
-# zk connection string
-# comma separated host:port pairs, each corresponding to a zk
-# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
-zk.connect=localhost:2181
-
-# timeout in ms for connecting to zookeeper
-zk.connection.timeout.ms=1000000
-
-# time based topic flush intervals in ms
-#log.flush.intervals.ms.per.topic=topic:1000
-
-# default time based flush interval in ms
-log.flush.interval.ms=1000
-
-# time based topic flasher time rate in ms
-log.flush.scheduler.interval.ms=1000
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/mirror_maker/config/server_source_1_2.properties
----------------------------------------------------------------------
diff --git a/system_test/mirror_maker/config/server_source_1_2.properties b/system_test/mirror_maker/config/server_source_1_2.properties
deleted file mode 100644
index f9353e8..0000000
--- a/system_test/mirror_maker/config/server_source_1_2.properties
+++ /dev/null
@@ -1,76 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-# 
-#    http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# see kafka.server.KafkaConfig for additional details and defaults
-
-# the id of the broker
-broker.id=2
-
-# hostname of broker. If not set, will pick up from the value returned
-# from getLocalHost.  If there are multiple interfaces getLocalHost
-# may not be what you want.
-# host.name=
-
-# number of logical partitions on this broker
-num.partitions=1
-
-# the port the socket server runs on
-port=9091
-
-# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
-num.threads=8
-
-# the directory in which to store log files
-log.dir=/tmp/kafka-source-1-2-logs
-
-# the send buffer used by the socket server 
-socket.send.buffer.bytes=1048576
-
-# the receive buffer used by the socket server
-socket.receive.buffer.bytes=1048576
-
-# the maximum size of a log segment
-log.segment.bytes=536870912
-
-# the interval between running cleanup on the logs
-log.cleanup.interval.mins=1
-
-# the minimum age of a log file to eligible for deletion
-log.retention.hours=168
-
-#the number of messages to accept without flushing the log to disk
-log.flush.interval.messages=600
-
-#set the following properties to use zookeeper
-
-# enable connecting to zookeeper
-enable.zookeeper=true
-
-# zk connection string
-# comma separated host:port pairs, each corresponding to a zk
-# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
-zk.connect=localhost:2181
-
-# timeout in ms for connecting to zookeeper
-zk.connection.timeout.ms=1000000
-
-# time based topic flush intervals in ms
-#log.flush.intervals.ms.per.topic=topic:1000
-
-# default time based flush interval in ms
-log.flush.interval.ms=1000
-
-# time based topic flasher time rate in ms
-log.flush.scheduler.interval.ms=1000
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/mirror_maker/config/server_source_2_1.properties
----------------------------------------------------------------------
diff --git a/system_test/mirror_maker/config/server_source_2_1.properties b/system_test/mirror_maker/config/server_source_2_1.properties
deleted file mode 100644
index daa01ad..0000000
--- a/system_test/mirror_maker/config/server_source_2_1.properties
+++ /dev/null
@@ -1,76 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-# 
-#    http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# see kafka.server.KafkaConfig for additional details and defaults
-
-# the id of the broker
-broker.id=1
-
-# hostname of broker. If not set, will pick up from the value returned
-# from getLocalHost.  If there are multiple interfaces getLocalHost
-# may not be what you want.
-# host.name=
-
-# number of logical partitions on this broker
-num.partitions=1
-
-# the port the socket server runs on
-port=9092
-
-# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
-num.threads=8
-
-# the directory in which to store log files
-log.dir=/tmp/kafka-source-2-1-logs
-
-# the send buffer used by the socket server 
-socket.send.buffer.bytes=1048576
-
-# the receive buffer used by the socket server
-socket.receive.buffer.bytes=1048576
-
-# the maximum size of a log segment
-log.segment.bytes=536870912
-
-# the interval between running cleanup on the logs
-log.cleanup.interval.mins=1
-
-# the minimum age of a log file to eligible for deletion
-log.retention.hours=168
-
-#the number of messages to accept without flushing the log to disk
-log.flush.interval.messages=600
-
-#set the following properties to use zookeeper
-
-# enable connecting to zookeeper
-enable.zookeeper=true
-
-# zk connection string
-# comma separated host:port pairs, each corresponding to a zk
-# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
-zk.connect=localhost:2182
-
-# timeout in ms for connecting to zookeeper
-zk.connection.timeout.ms=1000000
-
-# time based topic flush intervals in ms
-#log.flush.intervals.ms.per.topic=topic:1000
-
-# default time based flush interval in ms
-log.flush.interval.ms=1000
-
-# time based topic flasher time rate in ms
-log.flush.scheduler.interval.ms=1000
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/mirror_maker/config/server_source_2_2.properties
----------------------------------------------------------------------
diff --git a/system_test/mirror_maker/config/server_source_2_2.properties b/system_test/mirror_maker/config/server_source_2_2.properties
deleted file mode 100644
index be6fdfc..0000000
--- a/system_test/mirror_maker/config/server_source_2_2.properties
+++ /dev/null
@@ -1,76 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-# 
-#    http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# see kafka.server.KafkaConfig for additional details and defaults
-
-# the id of the broker
-broker.id=2
-
-# hostname of broker. If not set, will pick up from the value returned
-# from getLocalHost.  If there are multiple interfaces getLocalHost
-# may not be what you want.
-# host.name=
-
-# number of logical partitions on this broker
-num.partitions=1
-
-# the port the socket server runs on
-port=9093
-
-# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
-num.threads=8
-
-# the directory in which to store log files
-log.dir=/tmp/kafka-source-2-2-logs
-
-# the send buffer used by the socket server 
-socket.send.buffer.bytes=1048576
-
-# the receive buffer used by the socket server
-socket.receive.buffer.bytes=1048576
-
-# the maximum size of a log segment
-log.segment.bytes=536870912
-
-# the interval between running cleanup on the logs
-log.cleanup.interval.mins=1
-
-# the minimum age of a log file to eligible for deletion
-log.retention.hours=168
-
-#the number of messages to accept without flushing the log to disk
-log.flush.interval.messages=600
-
-#set the following properties to use zookeeper
-
-# enable connecting to zookeeper
-enable.zookeeper=true
-
-# zk connection string
-# comma separated host:port pairs, each corresponding to a zk
-# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
-zk.connect=localhost:2182
-
-# timeout in ms for connecting to zookeeper
-zk.connection.timeout.ms=1000000
-
-# time based topic flush intervals in ms
-#log.flush.intervals.ms.per.topic=topic:1000
-
-# default time based flush interval in ms
-log.flush.interval.ms=1000
-
-# time based topic flasher time rate in ms
-log.flush.scheduler.interval.ms=1000
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/mirror_maker/config/server_target_1_1.properties
----------------------------------------------------------------------
diff --git a/system_test/mirror_maker/config/server_target_1_1.properties b/system_test/mirror_maker/config/server_target_1_1.properties
deleted file mode 100644
index d37955a..0000000
--- a/system_test/mirror_maker/config/server_target_1_1.properties
+++ /dev/null
@@ -1,78 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-# 
-#    http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# see kafka.server.KafkaConfig for additional details and defaults
-
-# the id of the broker
-broker.id=1
-
-# hostname of broker. If not set, will pick up from the value returned
-# from getLocalHost.  If there are multiple interfaces getLocalHost
-# may not be what you want.
-# host.name=
-
-# number of logical partitions on this broker
-num.partitions=1
-
-# the port the socket server runs on
-port=9094
-
-# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
-num.threads=8
-
-# the directory in which to store log files
-log.dir=/tmp/kafka-target-1-1-logs
-
-# the send buffer used by the socket server 
-socket.send.buffer.bytes=1048576
-
-# the receive buffer used by the socket server
-socket.receive.buffer.bytes=1048576
-
-# the maximum size of a log segment
-log.segment.bytes=536870912
-
-# the interval between running cleanup on the logs
-log.cleanup.interval.mins=1
-
-# the minimum age of a log file to eligible for deletion
-log.retention.hours=168
-
-#the number of messages to accept without flushing the log to disk
-log.flush.interval.messages=600
-
-#set the following properties to use zookeeper
-
-# enable connecting to zookeeper
-enable.zookeeper=true
-
-# zk connection string
-# comma separated host:port pairs, each corresponding to a zk
-# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
-zk.connect=localhost:2183
-
-# timeout in ms for connecting to zookeeper
-zk.connection.timeout.ms=1000000
-
-# time based topic flush intervals in ms
-#log.flush.intervals.ms.per.topic=topic:1000
-
-# default time based flush interval in ms
-log.flush.interval.ms=1000
-
-# time based topic flasher time rate in ms
-log.flush.scheduler.interval.ms=1000
-
-# topic partition count map
-# topic.partition.count.map=topic1:3, topic2:4

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/mirror_maker/config/server_target_1_2.properties
----------------------------------------------------------------------
diff --git a/system_test/mirror_maker/config/server_target_1_2.properties b/system_test/mirror_maker/config/server_target_1_2.properties
deleted file mode 100644
index aa7546c..0000000
--- a/system_test/mirror_maker/config/server_target_1_2.properties
+++ /dev/null
@@ -1,78 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-# 
-#    http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# see kafka.server.KafkaConfig for additional details and defaults
-
-# the id of the broker
-broker.id=2
-
-# hostname of broker. If not set, will pick up from the value returned
-# from getLocalHost.  If there are multiple interfaces getLocalHost
-# may not be what you want.
-# host.name=
-
-# number of logical partitions on this broker
-num.partitions=1
-
-# the port the socket server runs on
-port=9095
-
-# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
-num.threads=8
-
-# the directory in which to store log files
-log.dir=/tmp/kafka-target-1-2-logs
-
-# the send buffer used by the socket server 
-socket.send.buffer.bytes=1048576
-
-# the receive buffer used by the socket server
-socket.receive.buffer.bytes=1048576
-
-# the maximum size of a log segment
-log.segment.bytes=536870912
-
-# the interval between running cleanup on the logs
-log.cleanup.interval.mins=1
-
-# the minimum age of a log file to eligible for deletion
-log.retention.hours=168
-
-#the number of messages to accept without flushing the log to disk
-log.flush.interval.messages=600
-
-#set the following properties to use zookeeper
-
-# enable connecting to zookeeper
-enable.zookeeper=true
-
-# zk connection string
-# comma separated host:port pairs, each corresponding to a zk
-# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
-zk.connect=localhost:2183
-
-# timeout in ms for connecting to zookeeper
-zk.connection.timeout.ms=1000000
-
-# time based topic flush intervals in ms
-#log.flush.intervals.ms.per.topic=topic:1000
-
-# default time based flush interval in ms
-log.flush.interval.ms=1000
-
-# time based topic flasher time rate in ms
-log.flush.scheduler.interval.ms=1000
-
-# topic partition count map
-# topic.partition.count.map=topic1:3, topic2:4

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/mirror_maker/config/whitelisttest_1.consumer.properties
----------------------------------------------------------------------
diff --git a/system_test/mirror_maker/config/whitelisttest_1.consumer.properties b/system_test/mirror_maker/config/whitelisttest_1.consumer.properties
deleted file mode 100644
index ff12015..0000000
--- a/system_test/mirror_maker/config/whitelisttest_1.consumer.properties
+++ /dev/null
@@ -1,28 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-# 
-#    http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# see kafka.consumer.ConsumerConfig for more details
-
-# zk connection string
-# comma separated host:port pairs, each corresponding to a zk
-# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
-zk.connect=localhost:2181
-
-# timeout in ms for connecting to zookeeper
-zk.connection.timeout.ms=1000000
-
-#consumer group id
-group.id=group1
-shallow.iterator.enable=true
-