You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2014/03/14 23:14:49 UTC
[3/5] KAFKA-1012; Consumer offset management in Kafka;
patched by Tejas Patil and Joel Koshy;
feedback and reviews from Neha Narkhede, Jun Rao, Guozhang Wang, Sriram
Subramanian, Joe Stein, Chris Riccomini
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/test/scala/other/kafka/TestOffsetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestOffsetManager.scala b/core/src/test/scala/other/kafka/TestOffsetManager.scala
new file mode 100644
index 0000000..83317f0
--- /dev/null
+++ b/core/src/test/scala/other/kafka/TestOffsetManager.scala
@@ -0,0 +1,291 @@
+package other.kafka
+
+import org.I0Itec.zkclient.ZkClient
+import kafka.api._
+import kafka.utils.{ShutdownableThread, ZKStringSerializer}
+import scala.collection._
+import kafka.client.ClientUtils
+import joptsimple.OptionParser
+import kafka.common.{ErrorMapping, OffsetAndMetadata, TopicAndPartition}
+import kafka.network.BlockingChannel
+import scala.util.Random
+import java.io.IOException
+import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
+import java.util.concurrent.TimeUnit
+import com.yammer.metrics.core.Gauge
+import java.util.concurrent.atomic.AtomicInteger
+import java.nio.channels.ClosedByInterruptException
+
+
+object TestOffsetManager {
+
+ val random = new Random
+ val SocketTimeoutMs = 10000
+
+ class StatsThread(reportingIntervalMs: Long, commitThreads: Seq[CommitThread], fetchThread: FetchThread)
+ extends ShutdownableThread("stats-thread") {
+
+ def printStats() {
+ println("--------------------------------------------------------------------------------")
+ println("Aggregate stats for commits:")
+ println("Error count: %d; Max:%f; Min: %f; Mean: %f; Commit count: %d".format(
+ commitThreads.map(_.numErrors.get).sum,
+ commitThreads.map(_.timer.max()).max,
+ commitThreads.map(_.timer.min()).min,
+ commitThreads.map(_.timer.mean()).sum / commitThreads.size,
+ commitThreads.map(_.numCommits.get).sum))
+ println("--------------------------------------------------------------------------------")
+ commitThreads.foreach(t => println(t.stats))
+ println(fetchThread.stats)
+ }
+
+ override def doWork() {
+ printStats()
+ Thread.sleep(reportingIntervalMs)
+ }
+
+ }
+
+ class CommitThread(id: Int, partitionCount: Int, commitIntervalMs: Long, zkClient: ZkClient)
+ extends ShutdownableThread("commit-thread")
+ with KafkaMetricsGroup {
+
+ private val group = "group-" + id
+ private val metadata = "Metadata from commit thread " + id
+ private var offsetsChannel = ClientUtils.channelToOffsetManager(group, zkClient, SocketTimeoutMs)
+ private var offset = 0L
+ val numErrors = new AtomicInteger(0)
+ val numCommits = new AtomicInteger(0)
+ val timer = newTimer("commit-thread", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)
+ private val commitTimer = new KafkaTimer(timer)
+ val shutdownLock = new Object
+
+ private def ensureConnected() {
+ if (!offsetsChannel.isConnected)
+ offsetsChannel = ClientUtils.channelToOffsetManager(group, zkClient, SocketTimeoutMs)
+ }
+
+ override def doWork() {
+ val commitRequest = OffsetCommitRequest(group, mutable.Map((1 to partitionCount).map(TopicAndPartition("topic-" + id, _) -> OffsetAndMetadata(offset, metadata)):_*))
+ try {
+ ensureConnected()
+ offsetsChannel.send(commitRequest)
+ numCommits.getAndIncrement
+ commitTimer.time {
+ val response = OffsetCommitResponse.readFrom(offsetsChannel.receive().buffer)
+ if (response.commitStatus.exists(_._2 != ErrorMapping.NoError)) numErrors.getAndIncrement
+ }
+ offset += 1
+ }
+ catch {
+ case e1: ClosedByInterruptException =>
+ offsetsChannel.disconnect()
+ case e2: IOException =>
+ println("Commit thread %d: Error while committing offsets to %s:%d for group %s due to %s.".format(id, offsetsChannel.host, offsetsChannel.port, group, e2))
+ offsetsChannel.disconnect()
+ }
+ finally {
+ Thread.sleep(commitIntervalMs)
+ }
+ }
+
+ override def shutdown() {
+ super.shutdown()
+ awaitShutdown()
+ offsetsChannel.disconnect()
+ println("Commit thread %d ended. Last committed offset: %d.".format(id, offset))
+ }
+
+ def stats = {
+ "Commit thread %d :: Error count: %d; Max:%f; Min: %f; Mean: %f; Commit count: %d"
+ .format(id, numErrors.get(), timer.max(), timer.min(), timer.mean(), numCommits.get())
+ }
+ }
+
+ class FetchThread(numGroups: Int, fetchIntervalMs: Long, zkClient: ZkClient)
+ extends ShutdownableThread("fetch-thread")
+ with KafkaMetricsGroup {
+
+ private val timer = newTimer("fetch-thread", TimeUnit.MILLISECONDS, TimeUnit.SECONDS)
+ private val fetchTimer = new KafkaTimer(timer)
+
+ private val channels = mutable.Map[Int, BlockingChannel]()
+ private var metadataChannel = ClientUtils.channelToAnyBroker(zkClient, SocketTimeoutMs)
+
+ private val numErrors = new AtomicInteger(0)
+
+ override def doWork() {
+ val id = random.nextInt().abs % numGroups
+ val group = "group-" + id
+ try {
+ metadataChannel.send(ConsumerMetadataRequest(group))
+ val coordinatorId = ConsumerMetadataResponse.readFrom(metadataChannel.receive().buffer).coordinator.map(_.id).getOrElse(-1)
+
+ val channel = if (channels.contains(coordinatorId))
+ channels(coordinatorId)
+ else {
+ val newChannel = ClientUtils.channelToOffsetManager(group, zkClient, SocketTimeoutMs)
+ channels.put(coordinatorId, newChannel)
+ newChannel
+ }
+
+ try {
+ // send the offset fetch request
+ val fetchRequest = OffsetFetchRequest(group, Seq(TopicAndPartition("topic-"+id, 1)))
+ channel.send(fetchRequest)
+
+ fetchTimer.time {
+ val response = OffsetFetchResponse.readFrom(channel.receive().buffer)
+ if (response.requestInfo.exists(_._2.error != ErrorMapping.NoError)) {
+ numErrors.getAndIncrement
+ }
+ }
+ }
+ catch {
+ case e1: ClosedByInterruptException =>
+ channel.disconnect()
+ channels.remove(coordinatorId)
+ case e2: IOException =>
+ println("Error while fetching offset from %s:%d due to %s.".format(channel.host, channel.port, e2))
+ channel.disconnect()
+ channels.remove(coordinatorId)
+ }
+ }
+ catch {
+ case e: IOException =>
+ println("Error while querying %s:%d - shutting down query channel.".format(metadataChannel.host, metadataChannel.port))
+ metadataChannel.disconnect()
+ println("Creating new query channel.")
+ metadataChannel = ClientUtils.channelToAnyBroker(zkClient, SocketTimeoutMs)
+ }
+ finally {
+ Thread.sleep(fetchIntervalMs)
+ }
+
+ }
+
+ override def shutdown() {
+ super.shutdown()
+ awaitShutdown()
+ channels.foreach(_._2.disconnect())
+ metadataChannel.disconnect()
+ }
+
+ def stats = {
+ "Fetch thread :: Error count: %d; Max:%f; Min: %f; Mean: %f; Fetch count: %d"
+ .format(numErrors.get(), timer.max(), timer.min(), timer.mean(), timer.count())
+ }
+ }
+
+ def main(args: Array[String]) {
+ val parser = new OptionParser
+ val zookeeperOpt = parser.accepts("zookeeper", "The ZooKeeper connection URL.")
+ .withRequiredArg
+ .describedAs("ZooKeeper URL")
+ .ofType(classOf[java.lang.String])
+ .defaultsTo("localhost:2181")
+
+ val commitIntervalOpt = parser.accepts("commit-interval-ms", "Offset commit interval.")
+ .withRequiredArg
+ .describedAs("interval")
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(100)
+
+ val fetchIntervalOpt = parser.accepts("fetch-interval-ms", "Offset fetch interval.")
+ .withRequiredArg
+ .describedAs("interval")
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(1000)
+
+ val numPartitionsOpt = parser.accepts("partition-count", "Number of partitions per commit.")
+ .withRequiredArg
+ .describedAs("interval")
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(1)
+
+ val numThreadsOpt = parser.accepts("thread-count", "Number of commit threads.")
+ .withRequiredArg
+ .describedAs("threads")
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(1)
+
+ val reportingIntervalOpt = parser.accepts("reporting-interval-ms", "Interval at which stats are reported.")
+ .withRequiredArg
+ .describedAs("interval (ms)")
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(3000)
+
+ val helpOpt = parser.accepts("help", "Print this message.")
+
+ val options = parser.parse(args : _*)
+
+ if (options.has(helpOpt)) {
+ parser.printHelpOn(System.out)
+ System.exit(0)
+ }
+
+ val commitIntervalMs = options.valueOf(commitIntervalOpt).intValue()
+ val fetchIntervalMs = options.valueOf(fetchIntervalOpt).intValue()
+ val threadCount = options.valueOf(numThreadsOpt).intValue()
+ val partitionCount = options.valueOf(numPartitionsOpt).intValue()
+ val zookeeper = options.valueOf(zookeeperOpt)
+ val reportingIntervalMs = options.valueOf(reportingIntervalOpt).intValue()
+ println("Commit thread count: %d; Partition count: %d, Commit interval: %d ms; Fetch interval: %d ms; Reporting interval: %d ms"
+ .format(threadCount, partitionCount, commitIntervalMs, fetchIntervalMs, reportingIntervalMs))
+
+ var zkClient: ZkClient = null
+ var commitThreads: Seq[CommitThread] = Seq()
+ var fetchThread: FetchThread = null
+ var statsThread: StatsThread = null
+ try {
+ zkClient = new ZkClient(zookeeper, 6000, 2000, ZKStringSerializer)
+ commitThreads = (0 to (threadCount-1)).map { threadId =>
+ new CommitThread(threadId, partitionCount, commitIntervalMs, zkClient)
+ }
+
+ fetchThread = new FetchThread(threadCount, fetchIntervalMs, zkClient)
+
+ val statsThread = new StatsThread(reportingIntervalMs, commitThreads, fetchThread)
+
+ Runtime.getRuntime.addShutdownHook(new Thread() {
+ override def run() {
+ cleanShutdown()
+ statsThread.printStats()
+ }
+ })
+
+ commitThreads.foreach(_.start())
+
+ fetchThread.start()
+
+ statsThread.start()
+
+ commitThreads.foreach(_.join())
+ fetchThread.join()
+ statsThread.join()
+ }
+ catch {
+ case e: Throwable =>
+ println("Error: ", e)
+ }
+ finally {
+ cleanShutdown()
+ }
+
+ def cleanShutdown() {
+ commitThreads.foreach(_.shutdown())
+ commitThreads.foreach(_.join())
+ if (fetchThread != null) {
+ fetchThread.shutdown()
+ fetchThread.join()
+ }
+ if (statsThread != null) {
+ statsThread.shutdown()
+ statsThread.join()
+ }
+ zkClient.close()
+ }
+
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala b/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala
deleted file mode 100644
index 31534ca..0000000
--- a/core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka
-
-import consumer._
-import utils.Utils
-import java.util.concurrent.CountDownLatch
-
-object TestZKConsumerOffsets {
- def main(args: Array[String]): Unit = {
- if(args.length < 1) {
- println("USAGE: " + TestZKConsumerOffsets.getClass.getName + " consumer.properties topic latest")
- System.exit(1)
- }
- println("Starting consumer...")
- val topic = args(1)
- val autoOffsetReset = args(2)
- val props = Utils.loadProps(args(0))
- props.put("auto.offset.reset", "largest")
-
- val config = new ConsumerConfig(props)
- val consumerConnector: ConsumerConnector = Consumer.create(config)
- val topicMessageStreams = consumerConnector.createMessageStreams(Predef.Map(topic -> 1))
- var threadList = List[ConsumerThread]()
- for ((topic, streamList) <- topicMessageStreams)
- for (stream <- streamList)
- threadList ::= new ConsumerThread(stream)
-
- for (thread <- threadList)
- thread.start
-
- // attach shutdown handler to catch control-c
- Runtime.getRuntime().addShutdownHook(new Thread() {
- override def run() = {
- consumerConnector.shutdown
- threadList.foreach(_.shutdown)
- println("consumer threads shutted down")
- }
- })
- }
-}
-
-private class ConsumerThread(stream: KafkaStream[Array[Byte], Array[Byte]]) extends Thread {
- val shutdownLatch = new CountDownLatch(1)
-
- override def run() {
- println("Starting consumer thread..")
- for (messageAndMetadata <- stream) {
- println("consumed: " + new String(messageAndMetadata.message, "UTF-8"))
- }
- shutdownLatch.countDown
- println("thread shutdown !" )
- }
-
- def shutdown() {
- shutdownLatch.await
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index dbe078c..6db76a5 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -142,13 +142,6 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
val offsetResponse = consumer.getOffsetsBefore(offsetRequest)
val errorCode = offsetResponse.partitionErrorAndOffsets(topicAndPartition).error
assertTrue("Offset request should fail with UnknownTopicOrPartitionCode", errorCode == ErrorMapping.UnknownTopicOrPartitionCode)
- // test if offset fetch requests fail during delete topic
- val offsetFetchRequest = new OffsetFetchRequest("test-group", Seq(topicAndPartition))
- val offsetFetchResponse = consumer.fetchOffsets(offsetFetchRequest)
- val offsetFetchErrorCode = offsetFetchResponse.requestInfo(topicAndPartition).error
- assertTrue("Offset fetch request should fail with UnknownTopicOrPartitionCode",
- offsetFetchErrorCode == ErrorMapping.UnknownTopicOrPartitionCode)
- // TODO: test if offset commit requests fail during delete topic
}
// restart follower replica
follower.startup()
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index eb274d1..5378446 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -23,12 +23,12 @@ import junit.framework.Assert._
import java.nio.ByteBuffer
import kafka.message.{Message, ByteBufferMessageSet}
import kafka.cluster.Broker
-import collection.mutable._
-import kafka.common.{TopicAndPartition, ErrorMapping, OffsetMetadataAndError}
+import kafka.common.{OffsetAndMetadata, TopicAndPartition, ErrorMapping, OffsetMetadataAndError}
import kafka.controller.LeaderIsrAndControllerEpoch
+import kafka.utils.SystemTime
-object SerializationTestUtils{
+object SerializationTestUtils {
private val topic1 = "test1"
private val topic2 = "test2"
private val leader1 = 0
@@ -147,17 +147,15 @@ object SerializationTestUtils{
}
def createTestOffsetCommitRequest: OffsetCommitRequest = {
- new OffsetCommitRequest("group 1", collection.immutable.Map(
- TopicAndPartition(topic1, 0) -> OffsetMetadataAndError(offset=42L, metadata="some metadata"),
- TopicAndPartition(topic1, 1) -> OffsetMetadataAndError(offset=100L, metadata=OffsetMetadataAndError.NoMetadata)
+ new OffsetCommitRequest("group 1", collection.mutable.Map(
+ TopicAndPartition(topic1, 0) -> OffsetAndMetadata(offset=42L, metadata="some metadata", timestamp=SystemTime.milliseconds),
+ TopicAndPartition(topic1, 1) -> OffsetAndMetadata(offset=100L, metadata=OffsetAndMetadata.NoMetadata, timestamp=SystemTime.milliseconds)
))
}
def createTestOffsetCommitResponse: OffsetCommitResponse = {
- new OffsetCommitResponse(collection.immutable.Map(
- TopicAndPartition(topic1, 0) -> ErrorMapping.NoError,
- TopicAndPartition(topic1, 1) -> ErrorMapping.UnknownTopicOrPartitionCode
- ))
+ new OffsetCommitResponse(collection.immutable.Map(TopicAndPartition(topic1, 0) -> ErrorMapping.NoError,
+ TopicAndPartition(topic1, 1) -> ErrorMapping.NoError))
}
def createTestOffsetFetchRequest: OffsetFetchRequest = {
@@ -170,11 +168,18 @@ object SerializationTestUtils{
def createTestOffsetFetchResponse: OffsetFetchResponse = {
new OffsetFetchResponse(collection.immutable.Map(
TopicAndPartition(topic1, 0) -> OffsetMetadataAndError(42L, "some metadata", ErrorMapping.NoError),
- TopicAndPartition(topic1, 1) -> OffsetMetadataAndError(100L, OffsetMetadataAndError.NoMetadata,
- ErrorMapping.UnknownTopicOrPartitionCode)
+ TopicAndPartition(topic1, 1) -> OffsetMetadataAndError(100L, OffsetAndMetadata.NoMetadata, ErrorMapping.UnknownTopicOrPartitionCode)
))
}
+ def createConsumerMetadataRequest: ConsumerMetadataRequest = {
+ ConsumerMetadataRequest("group 1", clientId = "client 1")
+ }
+
+ def createConsumerMetadataResponse: ConsumerMetadataResponse = {
+ ConsumerMetadataResponse(Some(brokers.head), ErrorMapping.NoError)
+ }
+
}
class RequestResponseSerializationTest extends JUnitSuite {
@@ -193,114 +198,30 @@ class RequestResponseSerializationTest extends JUnitSuite {
private val offsetCommitResponse = SerializationTestUtils.createTestOffsetCommitResponse
private val offsetFetchRequest = SerializationTestUtils.createTestOffsetFetchRequest
private val offsetFetchResponse = SerializationTestUtils.createTestOffsetFetchResponse
-
+ private val consumerMetadataRequest = SerializationTestUtils.createConsumerMetadataRequest
+ private val consumerMetadataResponse = SerializationTestUtils.createConsumerMetadataResponse
@Test
def testSerializationAndDeserialization() {
- var buffer: ByteBuffer = ByteBuffer.allocate(leaderAndIsrRequest.sizeInBytes())
- leaderAndIsrRequest.writeTo(buffer)
- buffer.rewind()
- val deserializedLeaderAndIsrRequest = LeaderAndIsrRequest.readFrom(buffer)
- assertEquals("The original and deserialzed leaderAndISRRequest should be the same", leaderAndIsrRequest,
- deserializedLeaderAndIsrRequest)
-
- buffer = ByteBuffer.allocate(leaderAndIsrResponse.sizeInBytes())
- leaderAndIsrResponse.writeTo(buffer)
- buffer.rewind()
- val deserializedLeaderAndIsrResponse = LeaderAndIsrResponse.readFrom(buffer)
- assertEquals("The original and deserialzed leaderAndISRResponse should be the same", leaderAndIsrResponse,
- deserializedLeaderAndIsrResponse)
-
- buffer = ByteBuffer.allocate(stopReplicaRequest.sizeInBytes())
- stopReplicaRequest.writeTo(buffer)
- buffer.rewind()
- val deserializedStopReplicaRequest = StopReplicaRequest.readFrom(buffer)
- assertEquals("The original and deserialzed stopReplicaRequest should be the same", stopReplicaRequest,
- deserializedStopReplicaRequest)
-
- buffer = ByteBuffer.allocate(stopReplicaResponse.sizeInBytes())
- stopReplicaResponse.writeTo(buffer)
- buffer.rewind()
- val deserializedStopReplicaResponse = StopReplicaResponse.readFrom(buffer)
- assertEquals("The original and deserialzed stopReplicaResponse should be the same", stopReplicaResponse,
- deserializedStopReplicaResponse)
-
- buffer = ByteBuffer.allocate(producerRequest.sizeInBytes)
- producerRequest.writeTo(buffer)
- buffer.rewind()
- val deserializedProducerRequest = ProducerRequest.readFrom(buffer)
- assertEquals("The original and deserialzed producerRequest should be the same", producerRequest,
- deserializedProducerRequest)
-
- buffer = ByteBuffer.allocate(producerResponse.sizeInBytes)
- producerResponse.writeTo(buffer)
- buffer.rewind()
- val deserializedProducerResponse = ProducerResponse.readFrom(buffer)
- assertEquals("The original and deserialzed producerResponse should be the same: [%s], [%s]".format(producerResponse, deserializedProducerResponse), producerResponse,
- deserializedProducerResponse)
-
- buffer = ByteBuffer.allocate(fetchRequest.sizeInBytes)
- fetchRequest.writeTo(buffer)
- buffer.rewind()
- val deserializedFetchRequest = FetchRequest.readFrom(buffer)
- assertEquals("The original and deserialzed fetchRequest should be the same", fetchRequest,
- deserializedFetchRequest)
-
- buffer = ByteBuffer.allocate(offsetRequest.sizeInBytes)
- offsetRequest.writeTo(buffer)
- buffer.rewind()
- val deserializedOffsetRequest = OffsetRequest.readFrom(buffer)
- assertEquals("The original and deserialzed offsetRequest should be the same", offsetRequest,
- deserializedOffsetRequest)
-
- buffer = ByteBuffer.allocate(offsetResponse.sizeInBytes)
- offsetResponse.writeTo(buffer)
- buffer.rewind()
- val deserializedOffsetResponse = OffsetResponse.readFrom(buffer)
- assertEquals("The original and deserialzed offsetResponse should be the same", offsetResponse,
- deserializedOffsetResponse)
-
- buffer = ByteBuffer.allocate(topicMetadataRequest.sizeInBytes())
- topicMetadataRequest.writeTo(buffer)
- buffer.rewind()
- val deserializedTopicMetadataRequest = TopicMetadataRequest.readFrom(buffer)
- assertEquals("The original and deserialzed topicMetadataRequest should be the same", topicMetadataRequest,
- deserializedTopicMetadataRequest)
-
- buffer = ByteBuffer.allocate(topicMetadataResponse.sizeInBytes)
- topicMetadataResponse.writeTo(buffer)
- buffer.rewind()
- val deserializedTopicMetadataResponse = TopicMetadataResponse.readFrom(buffer)
- assertEquals("The original and deserialzed topicMetadataResponse should be the same", topicMetadataResponse,
- deserializedTopicMetadataResponse)
-
- buffer = ByteBuffer.allocate(offsetCommitRequest.sizeInBytes)
- offsetCommitRequest.writeTo(buffer)
- buffer.rewind()
- val deserializedOffsetCommitRequest = OffsetCommitRequest.readFrom(buffer)
- assertEquals("The original and deserialzed offsetCommitRequest should be the same", offsetCommitRequest,
- deserializedOffsetCommitRequest)
-
- buffer = ByteBuffer.allocate(offsetCommitResponse.sizeInBytes)
- offsetCommitResponse.writeTo(buffer)
- buffer.rewind()
- val deserializedOffsetCommitResponse = OffsetCommitResponse.readFrom(buffer)
- assertEquals("The original and deserialzed offsetCommitResponse should be the same", offsetCommitResponse,
- deserializedOffsetCommitResponse)
-
- buffer = ByteBuffer.allocate(offsetFetchRequest.sizeInBytes)
- offsetFetchRequest.writeTo(buffer)
- buffer.rewind()
- val deserializedOffsetFetchRequest = OffsetFetchRequest.readFrom(buffer)
- assertEquals("The original and deserialzed offsetFetchRequest should be the same", offsetFetchRequest,
- deserializedOffsetFetchRequest)
-
- buffer = ByteBuffer.allocate(offsetFetchResponse.sizeInBytes)
- offsetFetchResponse.writeTo(buffer)
- buffer.rewind()
- val deserializedOffsetFetchResponse = OffsetFetchResponse.readFrom(buffer)
- assertEquals("The original and deserialzed offsetFetchResponse should be the same", offsetFetchResponse,
- deserializedOffsetFetchResponse)
+ val requestsAndResponses =
+ collection.immutable.Seq(leaderAndIsrRequest, leaderAndIsrResponse,
+ stopReplicaRequest, stopReplicaResponse,
+ producerRequest, producerResponse,
+ fetchRequest,
+ offsetRequest, offsetResponse,
+ topicMetadataRequest, topicMetadataResponse,
+ offsetCommitRequest, offsetCommitResponse,
+ offsetFetchRequest, offsetFetchResponse,
+ consumerMetadataRequest, consumerMetadataResponse)
+
+ requestsAndResponses.foreach { original =>
+ val buffer = ByteBuffer.allocate(original.sizeInBytes)
+ original.writeTo(buffer)
+ buffer.rewind()
+ val deserializer = original.getClass.getDeclaredMethod("readFrom", classOf[ByteBuffer])
+ val deserialized = deserializer.invoke(null, buffer)
+ assertEquals("The original and deserialized request/response should be the same.", original, deserialized)
+ }
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
index cf2724b..d903a6f 100644
--- a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
@@ -21,6 +21,7 @@ package kafka.consumer
import junit.framework.Assert._
import org.scalatest.junit.JUnitSuite
import org.junit.Test
+import kafka.server.OffsetManager
class TopicFilterTest extends JUnitSuite {
@@ -29,19 +30,30 @@ class TopicFilterTest extends JUnitSuite {
def testWhitelists() {
val topicFilter1 = new Whitelist("white1,white2")
- assertTrue(topicFilter1.isTopicAllowed("white2"))
- assertFalse(topicFilter1.isTopicAllowed("black1"))
+ assertTrue(topicFilter1.isTopicAllowed("white2", excludeInternalTopics = true))
+ assertTrue(topicFilter1.isTopicAllowed("white2", excludeInternalTopics = false))
+ assertFalse(topicFilter1.isTopicAllowed("black1", excludeInternalTopics = true))
+ assertFalse(topicFilter1.isTopicAllowed("black1", excludeInternalTopics = false))
val topicFilter2 = new Whitelist(".+")
- assertTrue(topicFilter2.isTopicAllowed("alltopics"))
-
+ assertTrue(topicFilter2.isTopicAllowed("alltopics", excludeInternalTopics = true))
+ assertFalse(topicFilter2.isTopicAllowed(OffsetManager.OffsetsTopicName, excludeInternalTopics = true))
+ assertTrue(topicFilter2.isTopicAllowed(OffsetManager.OffsetsTopicName, excludeInternalTopics = false))
+
val topicFilter3 = new Whitelist("white_listed-topic.+")
- assertTrue(topicFilter3.isTopicAllowed("white_listed-topic1"))
- assertFalse(topicFilter3.isTopicAllowed("black1"))
+ assertTrue(topicFilter3.isTopicAllowed("white_listed-topic1", excludeInternalTopics = true))
+ assertFalse(topicFilter3.isTopicAllowed("black1", excludeInternalTopics = true))
}
@Test
def testBlacklists() {
val topicFilter1 = new Blacklist("black1")
+ assertTrue(topicFilter1.isTopicAllowed("white2", excludeInternalTopics = true))
+ assertTrue(topicFilter1.isTopicAllowed("white2", excludeInternalTopics = false))
+ assertFalse(topicFilter1.isTopicAllowed("black1", excludeInternalTopics = true))
+ assertFalse(topicFilter1.isTopicAllowed("black1", excludeInternalTopics = false))
+
+ assertFalse(topicFilter1.isTopicAllowed(OffsetManager.OffsetsTopicName, excludeInternalTopics = true))
+ assertTrue(topicFilter1.isTopicAllowed(OffsetManager.OffsetsTopicName, excludeInternalTopics = false))
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
index 8fe7259..258dd25 100644
--- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
@@ -115,7 +115,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
assertEquals(expected_1, actual_1)
// commit consumed offsets
- zkConsumerConnector1.commitOffsets
+ zkConsumerConnector1.commitOffsets()
// create a consumer
val consumerConfig2 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer2)) {
@@ -194,7 +194,7 @@ class ZookeeperConsumerConnectorTest extends JUnit3Suite with KafkaServerTestHar
assertEquals(expected_1, actual_1)
// commit consumed offsets
- zkConsumerConnector1.commitOffsets
+ zkConsumerConnector1.commitOffsets()
// create a consumer
val consumerConfig2 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer2)) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
index 6a96d80..e632997 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
@@ -23,13 +23,13 @@ import junit.framework.Assert._
import java.util.Properties
import kafka.consumer.SimpleConsumer
import org.junit.{After, Before, Test}
-import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
import kafka.zk.ZooKeeperTestHarness
import org.scalatest.junit.JUnit3Suite
-import kafka.api.{OffsetCommitRequest, OffsetFetchRequest}
+import kafka.api.{ConsumerMetadataRequest, OffsetCommitRequest, OffsetFetchRequest}
import kafka.utils.TestUtils._
-import kafka.common.{ErrorMapping, TopicAndPartition, OffsetMetadataAndError}
+import kafka.common.{OffsetMetadataAndError, OffsetAndMetadata, ErrorMapping, TopicAndPartition}
import scala.util.Random
+import scala.collection._
import kafka.admin.AdminUtils
class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
@@ -39,6 +39,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
var server: KafkaServer = null
var logSize: Int = 100
val brokerPort: Int = 9099
+ val group = "test-group"
var simpleConsumer: SimpleConsumer = null
var time: Time = new MockTime()
@@ -51,6 +52,14 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
time = new MockTime()
server = TestUtils.createServer(new KafkaConfig(config), time)
simpleConsumer = new SimpleConsumer("localhost", brokerPort, 1000000, 64*1024, "test-client")
+ val consumerMetadataRequest = ConsumerMetadataRequest(group)
+ Stream.continually {
+ val consumerMetadataResponse = simpleConsumer.send(consumerMetadataRequest)
+ consumerMetadataResponse.coordinator.isDefined
+ }.dropWhile(success => {
+ if (!success) Thread.sleep(1000)
+ !success
+ })
}
@After
@@ -72,34 +81,34 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000)
assertTrue("Leader should be elected after topic creation", leaderIdOpt.isDefined)
- val commitRequest = OffsetCommitRequest("test-group", Map(topicAndPartition -> OffsetMetadataAndError(offset=42L)))
+ val commitRequest = OffsetCommitRequest("test-group", mutable.Map(topicAndPartition -> OffsetAndMetadata(offset=42L)))
val commitResponse = simpleConsumer.commitOffsets(commitRequest)
- assertEquals(ErrorMapping.NoError, commitResponse.requestInfo.get(topicAndPartition).get)
+ assertEquals(ErrorMapping.NoError, commitResponse.commitStatus.get(topicAndPartition).get)
// Fetch it and verify
- val fetchRequest = OffsetFetchRequest("test-group", Seq(topicAndPartition))
+ val fetchRequest = OffsetFetchRequest(group, Seq(topicAndPartition))
val fetchResponse = simpleConsumer.fetchOffsets(fetchRequest)
assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(topicAndPartition).get.error)
- //assertEquals(OffsetMetadataAndError.NoMetadata, fetchResponse.requestInfo.get(topicAndPartition).get.metadata)
+ assertEquals(OffsetAndMetadata.NoMetadata, fetchResponse.requestInfo.get(topicAndPartition).get.metadata)
assertEquals(42L, fetchResponse.requestInfo.get(topicAndPartition).get.offset)
// Commit a new offset
- val commitRequest1 = OffsetCommitRequest("test-group", Map(topicAndPartition -> OffsetMetadataAndError(
+ val commitRequest1 = OffsetCommitRequest(group, mutable.Map(topicAndPartition -> OffsetAndMetadata(
offset=100L,
metadata="some metadata"
)))
val commitResponse1 = simpleConsumer.commitOffsets(commitRequest1)
- assertEquals(ErrorMapping.NoError, commitResponse1.requestInfo.get(topicAndPartition).get)
+ assertEquals(ErrorMapping.NoError, commitResponse1.commitStatus.get(topicAndPartition).get)
// Fetch it and verify
- val fetchRequest1 = OffsetFetchRequest("test-group", Seq(topicAndPartition))
+ val fetchRequest1 = OffsetFetchRequest(group, Seq(topicAndPartition))
val fetchResponse1 = simpleConsumer.fetchOffsets(fetchRequest1)
-
+
assertEquals(ErrorMapping.NoError, fetchResponse1.requestInfo.get(topicAndPartition).get.error)
- //assertEquals("some metadata", fetchResponse1.requestInfo.get(topicAndPartition).get.metadata)
+ assertEquals("some metadata", fetchResponse1.requestInfo.get(topicAndPartition).get.metadata)
assertEquals(100L, fetchResponse1.requestInfo.get(topicAndPartition).get.offset)
}
@@ -111,34 +120,19 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
val topic3 = "topic-3"
val topic4 = "topic-4"
- val expectedReplicaAssignment = Map(0 -> List(1))
- // create the topic
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic1, expectedReplicaAssignment)
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic2, expectedReplicaAssignment)
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic3, expectedReplicaAssignment)
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic4, expectedReplicaAssignment)
- var leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 0, 1000)
- assertTrue("Leader should be elected after topic creation", leaderIdOpt.isDefined)
- leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic2, 0, 1000)
- assertTrue("Leader should be elected after topic creation", leaderIdOpt.isDefined)
- leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic3, 0, 1000)
- assertTrue("Leader should be elected after topic creation", leaderIdOpt.isDefined)
- leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic4, 0, 1000)
- assertTrue("Leader should be elected after topic creation", leaderIdOpt.isDefined)
-
- val commitRequest = OffsetCommitRequest("test-group", Map(
- TopicAndPartition(topic1, 0) -> OffsetMetadataAndError(offset=42L, metadata="metadata one"),
- TopicAndPartition(topic2, 0) -> OffsetMetadataAndError(offset=43L, metadata="metadata two"),
- TopicAndPartition(topic3, 0) -> OffsetMetadataAndError(offset=44L, metadata="metadata three"),
- TopicAndPartition(topic2, 1) -> OffsetMetadataAndError(offset=45L)
+ val commitRequest = OffsetCommitRequest("test-group", mutable.Map(
+ TopicAndPartition(topic1, 0) -> OffsetAndMetadata(offset=42L, metadata="metadata one"),
+ TopicAndPartition(topic2, 0) -> OffsetAndMetadata(offset=43L, metadata="metadata two"),
+ TopicAndPartition(topic3, 0) -> OffsetAndMetadata(offset=44L, metadata="metadata three"),
+ TopicAndPartition(topic2, 1) -> OffsetAndMetadata(offset=45L)
))
val commitResponse = simpleConsumer.commitOffsets(commitRequest)
- assertEquals(ErrorMapping.NoError, commitResponse.requestInfo.get(TopicAndPartition(topic1, 0)).get)
- assertEquals(ErrorMapping.NoError, commitResponse.requestInfo.get(TopicAndPartition(topic2, 0)).get)
- assertEquals(ErrorMapping.NoError, commitResponse.requestInfo.get(TopicAndPartition(topic3, 0)).get)
- assertEquals(ErrorMapping.NoError, commitResponse.requestInfo.get(TopicAndPartition(topic2, 1)).get)
+ assertEquals(ErrorMapping.NoError, commitResponse.commitStatus.get(TopicAndPartition(topic1, 0)).get)
+ assertEquals(ErrorMapping.NoError, commitResponse.commitStatus.get(TopicAndPartition(topic2, 0)).get)
+ assertEquals(ErrorMapping.NoError, commitResponse.commitStatus.get(TopicAndPartition(topic3, 0)).get)
+ assertEquals(ErrorMapping.NoError, commitResponse.commitStatus.get(TopicAndPartition(topic2, 1)).get)
- val fetchRequest = OffsetFetchRequest("test-group", Seq(
+ val fetchRequest = OffsetFetchRequest(group, Seq(
TopicAndPartition(topic1, 0),
TopicAndPartition(topic2, 0),
TopicAndPartition(topic3, 0),
@@ -152,22 +146,22 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 0)).get.error)
assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 0)).get.error)
assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 1)).get.error)
- assertEquals(ErrorMapping.UnknownTopicOrPartitionCode, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get.error)
- assertEquals(ErrorMapping.UnknownTopicOrPartitionCode, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get.error)
+ assertEquals(OffsetMetadataAndError.NoOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get)
+ assertEquals(OffsetMetadataAndError.NoOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get)
- //assertEquals("metadata one", fetchResponse.requestInfo.get(TopicAndPartition(topic1, 0)).get.metadata)
- //assertEquals("metadata two", fetchResponse.requestInfo.get(TopicAndPartition(topic2, 0)).get.metadata)
- //assertEquals("metadata three", fetchResponse.requestInfo.get(TopicAndPartition(topic3, 0)).get.metadata)
- //assertEquals(OffsetMetadataAndError.NoMetadata, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 1)).get.metadata)
- //assertEquals(OffsetMetadataAndError.NoMetadata, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get.metadata)
- //assertEquals(OffsetMetadataAndError.NoMetadata, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get.metadata)
+ assertEquals("metadata one", fetchResponse.requestInfo.get(TopicAndPartition(topic1, 0)).get.metadata)
+ assertEquals("metadata two", fetchResponse.requestInfo.get(TopicAndPartition(topic2, 0)).get.metadata)
+ assertEquals("metadata three", fetchResponse.requestInfo.get(TopicAndPartition(topic3, 0)).get.metadata)
+ assertEquals(OffsetAndMetadata.NoMetadata, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 1)).get.metadata)
+ assertEquals(OffsetAndMetadata.NoMetadata, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get.metadata)
+ assertEquals(OffsetAndMetadata.NoMetadata, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get.metadata)
assertEquals(42L, fetchResponse.requestInfo.get(TopicAndPartition(topic1, 0)).get.offset)
assertEquals(43L, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 0)).get.offset)
assertEquals(44L, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 0)).get.offset)
assertEquals(45L, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 1)).get.offset)
- assertEquals(OffsetMetadataAndError.InvalidOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get.offset)
- assertEquals(OffsetMetadataAndError.InvalidOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get.offset)
+ assertEquals(OffsetAndMetadata.InvalidOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get.offset)
+ assertEquals(OffsetAndMetadata.InvalidOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get.offset)
}
@Test
@@ -178,36 +172,21 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
var leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topicAndPartition.topic, 0, 1000)
assertTrue("Leader should be elected after topic creation", leaderIdOpt.isDefined)
- val commitRequest = OffsetCommitRequest("test-group", Map(topicAndPartition -> OffsetMetadataAndError(
+ val commitRequest = OffsetCommitRequest("test-group", mutable.Map(topicAndPartition -> OffsetAndMetadata(
offset=42L,
metadata=random.nextString(server.config.offsetMetadataMaxSize)
)))
val commitResponse = simpleConsumer.commitOffsets(commitRequest)
- assertEquals(ErrorMapping.NoError, commitResponse.requestInfo.get(topicAndPartition).get)
+ assertEquals(ErrorMapping.NoError, commitResponse.commitStatus.get(topicAndPartition).get)
- val commitRequest1 = OffsetCommitRequest("test-group", Map(topicAndPartition -> OffsetMetadataAndError(
+ val commitRequest1 = OffsetCommitRequest(group, mutable.Map(topicAndPartition -> OffsetAndMetadata(
offset=42L,
metadata=random.nextString(server.config.offsetMetadataMaxSize + 1)
)))
val commitResponse1 = simpleConsumer.commitOffsets(commitRequest1)
- assertEquals(ErrorMapping.OffsetMetadataTooLargeCode, commitResponse1.requestInfo.get(topicAndPartition).get)
-
- }
+ assertEquals(ErrorMapping.OffsetMetadataTooLargeCode, commitResponse1.commitStatus.get(topicAndPartition).get)
- @Test
- def testNullMetadata() {
- val topicAndPartition = TopicAndPartition("null-metadata", 0)
- val expectedReplicaAssignment = Map(0 -> List(1))
- AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topicAndPartition.topic, expectedReplicaAssignment)
- var leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topicAndPartition.topic, 0, 1000)
- assertTrue("Leader should be elected after topic creation", leaderIdOpt.isDefined)
- val commitRequest = OffsetCommitRequest("test-group", Map(topicAndPartition -> OffsetMetadataAndError(
- offset=42L,
- metadata=null
- )))
- val commitResponse = simpleConsumer.commitOffsets(commitRequest)
- assertEquals(ErrorMapping.NoError, commitResponse.requestInfo.get(topicAndPartition).get)
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index 1317b4c..22bb6f2 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -86,12 +86,15 @@ class SimpleFetchTest extends JUnit3Suite {
EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, partitionId)).andReturn(partition.leaderReplicaIfLocal().get).anyTimes()
EasyMock.replay(replicaManager)
+ val offsetManager = EasyMock.createMock(classOf[kafka.server.OffsetManager])
+
val controller = EasyMock.createMock(classOf[kafka.controller.KafkaController])
// start a request channel with 2 processors and a queue size of 5 (this is more or less arbitrary)
// don't provide replica or leader callbacks since they will not be tested here
val requestChannel = new RequestChannel(2, 5)
- val apis = new KafkaApis(requestChannel, replicaManager, zkClient, configs.head.brokerId, configs.head, controller)
+ val apis = new KafkaApis(requestChannel, replicaManager, offsetManager, zkClient, configs.head.brokerId, configs.head, controller)
+
val partitionStateInfo = EasyMock.createNiceMock(classOf[PartitionStateInfo])
apis.metadataCache.put(TopicAndPartition(topic, partitionId), partitionStateInfo)
EasyMock.replay(partitionStateInfo)
@@ -159,10 +162,12 @@ class SimpleFetchTest extends JUnit3Suite {
EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, partitionId)).andReturn(partition.leaderReplicaIfLocal().get).anyTimes()
EasyMock.replay(replicaManager)
+ val offsetManager = EasyMock.createMock(classOf[kafka.server.OffsetManager])
+
val controller = EasyMock.createMock(classOf[kafka.controller.KafkaController])
val requestChannel = new RequestChannel(2, 5)
- val apis = new KafkaApis(requestChannel, replicaManager, zkClient, configs.head.brokerId, configs.head, controller)
+ val apis = new KafkaApis(requestChannel, replicaManager, offsetManager, zkClient, configs.head.brokerId, configs.head, controller)
val partitionStateInfo = EasyMock.createNiceMock(classOf[PartitionStateInfo])
apis.metadataCache.put(TopicAndPartition(topic, partitionId), partitionStateInfo)
EasyMock.replay(partitionStateInfo)
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/migration_tool_testsuite/migration_tool_test.py
----------------------------------------------------------------------
diff --git a/system_test/migration_tool_testsuite/migration_tool_test.py b/system_test/migration_tool_testsuite/migration_tool_test.py
index 2fecd19..2386a58 100644
--- a/system_test/migration_tool_testsuite/migration_tool_test.py
+++ b/system_test/migration_tool_testsuite/migration_tool_test.py
@@ -171,7 +171,7 @@ class MigrationToolTest(ReplicationUtils, SetupUtils):
time.sleep(5)
self.log_message("creating topics")
- kafka_system_test_utils.create_topic(self.systemTestEnv, self.testcaseEnv)
+ kafka_system_test_utils.create_topic_for_producer_performance(self.systemTestEnv, self.testcaseEnv)
self.anonLogger.info("sleeping for 5s")
time.sleep(5)
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/mirror_maker/README
----------------------------------------------------------------------
diff --git a/system_test/mirror_maker/README b/system_test/mirror_maker/README
deleted file mode 100644
index da53c14..0000000
--- a/system_test/mirror_maker/README
+++ /dev/null
@@ -1,22 +0,0 @@
-This test replicates messages from two source kafka clusters into one target
-kafka cluster using the mirror-maker tool. At the end, the messages produced
-at the source brokers should match that at the target brokers.
-
-To run this test, do
-bin/run-test.sh
-
-In the event of failure, by default the brokers and zookeepers remain running
-to make it easier to debug the issue - hit Ctrl-C to shut them down. You can
-change this behavior by setting the action_on_fail flag in the script to "exit"
-or "proceed", in which case a snapshot of all the logs and directories is
-placed in the test's base directory.
-
-It is a good idea to run the test in a loop. E.g.:
-
-:>/tmp/mirrormaker_test.log
-for i in {1..10}; do echo "run $i"; ./bin/run-test.sh 2>1 >> /tmp/mirrormaker_test.log; done
-tail -F /tmp/mirrormaker_test.log
-
-grep -ic passed /tmp/mirrormaker_test.log
-grep -ic failed /tmp/mirrormaker_test.log
-
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/mirror_maker/bin/expected.out
----------------------------------------------------------------------
diff --git a/system_test/mirror_maker/bin/expected.out b/system_test/mirror_maker/bin/expected.out
deleted file mode 100644
index 0a1bbaf..0000000
--- a/system_test/mirror_maker/bin/expected.out
+++ /dev/null
@@ -1,18 +0,0 @@
-start the servers ...
-start producing messages ...
-wait for consumer to finish consuming ...
-[2011-05-17 14:49:11,605] INFO Creating async producer for broker id = 2 at localhost:9091 (kafka.producer.ProducerPool)
-[2011-05-17 14:49:11,606] INFO Creating async producer for broker id = 1 at localhost:9092 (kafka.producer.ProducerPool)
-[2011-05-17 14:49:11,607] INFO Creating async producer for broker id = 3 at localhost:9090 (kafka.producer.ProducerPool)
-thread 0: 400000 messages sent 3514012.1233 nMsg/sec 3.3453 MBs/sec
-[2011-05-17 14:49:34,382] INFO Closing all async producers (kafka.producer.ProducerPool)
-[2011-05-17 14:49:34,383] INFO Closed AsyncProducer (kafka.producer.async.AsyncProducer)
-[2011-05-17 14:49:34,384] INFO Closed AsyncProducer (kafka.producer.async.AsyncProducer)
-[2011-05-17 14:49:34,385] INFO Closed AsyncProducer (kafka.producer.async.AsyncProducer)
-Total Num Messages: 400000 bytes: 79859641 in 22.93 secs
-Messages/sec: 17444.3960
-MB/sec: 3.3214
-test passed
-stopping the servers
-bin/../../../bin/zookeeper-server-start.sh: line 9: 22584 Terminated $(dirname $0)/kafka-run-class.sh org.apache.zookeeper.server.quorum.QuorumPeerMain $@
-bin/../../../bin/zookeeper-server-start.sh: line 9: 22585 Terminated $(dirname $0)/kafka-run-class.sh org.apache.zookeeper.server.quorum.QuorumPeerMain $@
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/mirror_maker/bin/run-test.sh
----------------------------------------------------------------------
diff --git a/system_test/mirror_maker/bin/run-test.sh b/system_test/mirror_maker/bin/run-test.sh
deleted file mode 100644
index e5e6c08..0000000
--- a/system_test/mirror_maker/bin/run-test.sh
+++ /dev/null
@@ -1,357 +0,0 @@
-#!/bin/bash
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-readonly num_messages=10000
-readonly message_size=100
-readonly action_on_fail="proceed"
-# readonly action_on_fail="wait"
-
-readonly test_start_time="$(date +%s)"
-
-readonly base_dir=$(dirname $0)/..
-
-info() {
- echo -e "$(date +"%Y-%m-%d %H:%M:%S") $*"
-}
-
-kill_child_processes() {
- isTopmost=$1
- curPid=$2
- childPids=$(ps a -o pid= -o ppid= | grep "${curPid}$" | awk '{print $1;}')
- for childPid in $childPids
- do
- kill_child_processes 0 $childPid
- done
- if [ $isTopmost -eq 0 ]; then
- kill -15 $curPid 2> /dev/null
- fi
-}
-
-cleanup() {
- info "cleaning up"
-
- pid_zk_source1=
- pid_zk_source2=
- pid_zk_target=
- pid_kafka_source_1_1=
- pid_kafka_source_1_2=
- pid_kafka_source_2_1=
- pid_kafka_source_2_2=
- pid_kafka_target_1_1=
- pid_kafka_target_1_2=
- pid_producer=
- pid_mirrormaker_1=
- pid_mirrormaker_2=
-
- rm -rf /tmp/zookeeper*
-
- rm -rf /tmp/kafka*
-}
-
-begin_timer() {
- t_begin=$(date +%s)
-}
-
-end_timer() {
- t_end=$(date +%s)
-}
-
-start_zk() {
- info "starting zookeepers"
- $base_dir/../../bin/zookeeper-server-start.sh $base_dir/config/zookeeper_source_1.properties 2>&1 > $base_dir/zookeeper_source-1.log &
- pid_zk_source1=$!
- $base_dir/../../bin/zookeeper-server-start.sh $base_dir/config/zookeeper_source_2.properties 2>&1 > $base_dir/zookeeper_source-2.log &
- pid_zk_source2=$!
- $base_dir/../../bin/zookeeper-server-start.sh $base_dir/config/zookeeper_target.properties 2>&1 > $base_dir/zookeeper_target.log &
- pid_zk_target=$!
-}
-
-start_source_servers() {
- info "starting source cluster"
-
- JMX_PORT=1111 $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source_1_1.properties 2>&1 > $base_dir/kafka_source-1-1.log &
- pid_kafka_source_1_1=$!
- JMX_PORT=2222 $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source_1_2.properties 2>&1 > $base_dir/kafka_source-1-2.log &
- pid_kafka_source_1_2=$!
- JMX_PORT=3333 $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source_2_1.properties 2>&1 > $base_dir/kafka_source-2-1.log &
- pid_kafka_source_2_1=$!
- JMX_PORT=4444 $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_source_2_2.properties 2>&1 > $base_dir/kafka_source-2-2.log &
- pid_kafka_source_2_2=$!
-}
-
-start_target_servers() {
- info "starting mirror cluster"
- JMX_PORT=5555 $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_target_1_1.properties 2>&1 > $base_dir/kafka_target-1-1.log &
- pid_kafka_target_1_1=$!
- JMX_PORT=6666 $base_dir/../../bin/kafka-run-class.sh kafka.Kafka $base_dir/config/server_target_1_2.properties 2>&1 > $base_dir/kafka_target-1-2.log &
- pid_kafka_target_1_2=$!
-}
-
-shutdown_servers() {
- info "stopping mirror-maker"
- if [ "x${pid_mirrormaker_1}" != "x" ]; then kill_child_processes 0 ${pid_mirrormaker_1}; fi
- # sleep to avoid rebalancing during shutdown
- sleep 2
- if [ "x${pid_mirrormaker_2}" != "x" ]; then kill_child_processes 0 ${pid_mirrormaker_2}; fi
-
- info "stopping producer"
- if [ "x${pid_producer}" != "x" ]; then kill_child_processes 0 ${pid_producer}; fi
-
- info "shutting down target servers"
- if [ "x${pid_kafka_target_1_1}" != "x" ]; then kill_child_processes 0 ${pid_kafka_target_1_1}; fi
- if [ "x${pid_kafka_target_1_2}" != "x" ]; then kill_child_processes 0 ${pid_kafka_target_1_2}; fi
- sleep 2
-
- info "shutting down source servers"
- if [ "x${pid_kafka_source_1_1}" != "x" ]; then kill_child_processes 0 ${pid_kafka_source_1_1}; fi
- if [ "x${pid_kafka_source_1_2}" != "x" ]; then kill_child_processes 0 ${pid_kafka_source_1_2}; fi
- if [ "x${pid_kafka_source_2_1}" != "x" ]; then kill_child_processes 0 ${pid_kafka_source_2_1}; fi
- if [ "x${pid_kafka_source_2_2}" != "x" ]; then kill_child_processes 0 ${pid_kafka_source_2_2}; fi
-
- info "shutting down zookeeper servers"
- if [ "x${pid_zk_target}" != "x" ]; then kill_child_processes 0 ${pid_zk_target}; fi
- if [ "x${pid_zk_source1}" != "x" ]; then kill_child_processes 0 ${pid_zk_source1}; fi
- if [ "x${pid_zk_source2}" != "x" ]; then kill_child_processes 0 ${pid_zk_source2}; fi
-}
-
-start_producer() {
- topic=$1
- zk=$2
- info "start producing messages for topic $topic to zookeeper $zk ..."
- $base_dir/../../bin/kafka-run-class.sh kafka.perf.ProducerPerformance --brokerinfo zk.connect=$zk --topics $topic --messages $num_messages --message-size $message_size --batch-size 200 --vary-message-size --threads 1 --reporting-interval $num_messages --async 2>&1 > $base_dir/producer_performance.log &
- pid_producer=$!
-}
-
-# Usage: wait_partition_done ([kafka-server] [topic] [partition-id])+
-wait_partition_done() {
- n_tuples=$(($# / 3))
-
- i=1
- while (($#)); do
- kafka_server[i]=$1
- topic[i]=$2
- partitionid[i]=$3
- prev_offset[i]=0
- info "\twaiting for partition on server ${kafka_server[i]}, topic ${topic[i]}, partition ${partitionid[i]}"
- i=$((i+1))
- shift 3
- done
-
- all_done=0
-
- # set -x
- while [[ $all_done != 1 ]]; do
- sleep 4
- i=$n_tuples
- all_done=1
- for ((i=1; i <= $n_tuples; i++)); do
- cur_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server ${kafka_server[i]} --topic ${topic[i]} --partition ${partitionid[i]} --time -1 --offsets 1 | tail -1)
- if [ "x$cur_size" != "x${prev_offset[i]}" ]; then
- all_done=0
- prev_offset[i]=$cur_size
- fi
- done
- done
-
-}
-
-cmp_logs() {
- topic=$1
- info "comparing source and target logs for topic $topic"
- source_part0_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9090 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1)
- source_part1_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9091 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1)
- source_part2_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9092 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1)
- source_part3_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9093 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1)
- target_part0_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9094 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1)
- target_part1_size=$($base_dir/../../bin/kafka-run-class.sh kafka.tools.GetOffsetShell --server kafka://localhost:9095 --topic $topic --partition 0 --time -1 --offsets 1 | tail -1)
- if [ "x$source_part0_size" == "x" ]; then source_part0_size=0; fi
- if [ "x$source_part1_size" == "x" ]; then source_part1_size=0; fi
- if [ "x$source_part2_size" == "x" ]; then source_part2_size=0; fi
- if [ "x$source_part3_size" == "x" ]; then source_part3_size=0; fi
- if [ "x$target_part0_size" == "x" ]; then target_part0_size=0; fi
- if [ "x$target_part1_size" == "x" ]; then target_part1_size=0; fi
- expected_size=$(($source_part0_size + $source_part1_size + $source_part2_size + $source_part3_size))
- actual_size=$(($target_part0_size + $target_part1_size))
- if [ "x$expected_size" != "x$actual_size" ]
- then
- info "source size: $expected_size target size: $actual_size"
- return 1
- else
- return 0
- fi
-}
-
-take_fail_snapshot() {
- snapshot_dir="$base_dir/failed-${snapshot_prefix}-${test_start_time}"
- mkdir $snapshot_dir
- for dir in /tmp/zookeeper_source{1..2} /tmp/zookeeper_target /tmp/kafka-source-{1..2}-{1..2}-logs /tmp/kafka-target{1..2}-logs; do
- if [ -d $dir ]; then
- cp -r $dir $snapshot_dir
- fi
- done
-}
-
-# Usage: process_test_result <result> <action_on_fail>
-# result: last test result
-# action_on_fail: (exit|wait|proceed)
-# ("wait" is useful if you want to troubleshoot using zookeeper)
-process_test_result() {
- result=$1
- if [ $1 -eq 0 ]; then
- info "test passed"
- else
- info "test failed"
- case "$2" in
- "wait") info "waiting: hit Ctrl-c to quit"
- wait
- ;;
- "exit") shutdown_servers
- take_fail_snapshot
- exit $result
- ;;
- *) shutdown_servers
- take_fail_snapshot
- info "proceeding"
- ;;
- esac
- fi
-}
-
-test_whitelists() {
- info "### Testing whitelists"
- snapshot_prefix="whitelist-test"
-
- cleanup
- start_zk
- start_source_servers
- start_target_servers
- sleep 4
-
- info "starting mirror makers"
- JMX_PORT=7777 $base_dir/../../bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config $base_dir/config/whitelisttest_1.consumer.properties --consumer.config $base_dir/config/whitelisttest_2.consumer.properties --producer.config $base_dir/config/mirror_producer.properties --whitelist="white.*" --num.streams 2 2>&1 > $base_dir/kafka_mirrormaker_1.log &
- pid_mirrormaker_1=$!
- JMX_PORT=8888 $base_dir/../../bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config $base_dir/config/whitelisttest_1.consumer.properties --consumer.config $base_dir/config/whitelisttest_2.consumer.properties --producer.config $base_dir/config/mirror_producer.properties --whitelist="white.*" --num.streams 2 2>&1 > $base_dir/kafka_mirrormaker_2.log &
- pid_mirrormaker_2=$!
-
- begin_timer
-
- start_producer whitetopic01 localhost:2181
- start_producer whitetopic01 localhost:2182
- info "waiting for whitetopic01 producers to finish producing ..."
- wait_partition_done kafka://localhost:9090 whitetopic01 0 kafka://localhost:9091 whitetopic01 0 kafka://localhost:9092 whitetopic01 0 kafka://localhost:9093 whitetopic01 0
-
- start_producer whitetopic02 localhost:2181
- start_producer whitetopic03 localhost:2181
- start_producer whitetopic04 localhost:2182
- info "waiting for whitetopic02,whitetopic03,whitetopic04 producers to finish producing ..."
- wait_partition_done kafka://localhost:9090 whitetopic02 0 kafka://localhost:9091 whitetopic02 0 kafka://localhost:9090 whitetopic03 0 kafka://localhost:9091 whitetopic03 0 kafka://localhost:9092 whitetopic04 0 kafka://localhost:9093 whitetopic04 0
-
- start_producer blacktopic01 localhost:2182
- info "waiting for blacktopic01 producer to finish producing ..."
- wait_partition_done kafka://localhost:9092 blacktopic01 0 kafka://localhost:9093 blacktopic01 0
-
- info "waiting for consumer to finish consuming ..."
-
- wait_partition_done kafka://localhost:9094 whitetopic01 0 kafka://localhost:9095 whitetopic01 0 kafka://localhost:9094 whitetopic02 0 kafka://localhost:9095 whitetopic02 0 kafka://localhost:9094 whitetopic03 0 kafka://localhost:9095 whitetopic03 0 kafka://localhost:9094 whitetopic04 0 kafka://localhost:9095 whitetopic04 0
-
- end_timer
- info "embedded consumer took $((t_end - t_begin)) seconds"
-
- sleep 2
-
- # if [[ -d /tmp/kafka-target-1-1-logs/blacktopic01 || /tmp/kafka-target-1-2-logs/blacktopic01 ]]; then
- # echo "blacktopic01 found on target cluster"
- # result=1
- # else
- # cmp_logs whitetopic01 && cmp_logs whitetopic02 && cmp_logs whitetopic03 && cmp_logs whitetopic04
- # result=$?
- # fi
-
- cmp_logs blacktopic01
-
- cmp_logs whitetopic01 && cmp_logs whitetopic02 && cmp_logs whitetopic03 && cmp_logs whitetopic04
- result=$?
-
- return $result
-}
-
-test_blacklists() {
- info "### Testing blacklists"
- snapshot_prefix="blacklist-test"
- cleanup
- start_zk
- start_source_servers
- start_target_servers
- sleep 4
-
- info "starting mirror maker"
- $base_dir/../../bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config $base_dir/config/blacklisttest.consumer.properties --producer.config $base_dir/config/mirror_producer.properties --blacklist="black.*" --num.streams 2 2>&1 > $base_dir/kafka_mirrormaker_1.log &
- pid_mirrormaker_1=$!
-
- start_producer blacktopic01 localhost:2181
- start_producer blacktopic02 localhost:2181
- info "waiting for producer to finish producing blacktopic01,blacktopic02 ..."
- wait_partition_done kafka://localhost:9090 blacktopic01 0 kafka://localhost:9091 blacktopic01 0 kafka://localhost:9090 blacktopic02 0 kafka://localhost:9091 blacktopic02 0
-
- begin_timer
-
- start_producer whitetopic01 localhost:2181
- info "waiting for producer to finish producing whitetopic01 ..."
- wait_partition_done kafka://localhost:9090 whitetopic01 0 kafka://localhost:9091 whitetopic01 0
-
- info "waiting for consumer to finish consuming ..."
- wait_partition_done kafka://localhost:9094 whitetopic01 0 kafka://localhost:9095 whitetopic01 0
-
- end_timer
-
- info "embedded consumer took $((t_end - t_begin)) seconds"
-
- sleep 2
-
- cmp_logs blacktopic01 || cmp_logs blacktopic02
- if [ $? -eq 0 ]; then
- return 1
- fi
-
- cmp_logs whitetopic01
- return $?
-}
-
-# main test begins
-
-echo "Test-$test_start_time"
-
-# Ctrl-c trap. Catches INT signal
-trap "shutdown_servers; exit 0" INT
-
-test_whitelists
-result=$?
-
-process_test_result $result $action_on_fail
-
-shutdown_servers
-
-sleep 2
-
-test_blacklists
-result=$?
-
-process_test_result $result $action_on_fail
-
-shutdown_servers
-
-exit $result
-
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/mirror_maker/config/blacklisttest.consumer.properties
----------------------------------------------------------------------
diff --git a/system_test/mirror_maker/config/blacklisttest.consumer.properties b/system_test/mirror_maker/config/blacklisttest.consumer.properties
deleted file mode 100644
index ff12015..0000000
--- a/system_test/mirror_maker/config/blacklisttest.consumer.properties
+++ /dev/null
@@ -1,28 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# see kafka.consumer.ConsumerConfig for more details
-
-# zk connection string
-# comma separated host:port pairs, each corresponding to a zk
-# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
-zk.connect=localhost:2181
-
-# timeout in ms for connecting to zookeeper
-zk.connection.timeout.ms=1000000
-
-#consumer group id
-group.id=group1
-shallow.iterator.enable=true
-
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/mirror_maker/config/mirror_producer.properties
----------------------------------------------------------------------
diff --git a/system_test/mirror_maker/config/mirror_producer.properties b/system_test/mirror_maker/config/mirror_producer.properties
deleted file mode 100644
index aa8be65..0000000
--- a/system_test/mirror_maker/config/mirror_producer.properties
+++ /dev/null
@@ -1,30 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# zk connection string
-# comma separated host:port pairs, each corresponding to a zk
-# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
-zk.connect=localhost:2183
-# broker.list=1:localhost:9094,2:localhost:9095
-
-# timeout in ms for connecting to zookeeper
-# zk.connection.timeout.ms=1000000
-
-producer.type=async
-
-# to avoid dropping events if the queue is full, wait indefinitely
-queue.enqueue.timeout.ms=-1
-
-num.producers.per.broker=2
-
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/mirror_maker/config/server_source_1_1.properties
----------------------------------------------------------------------
diff --git a/system_test/mirror_maker/config/server_source_1_1.properties b/system_test/mirror_maker/config/server_source_1_1.properties
deleted file mode 100644
index 2f070a7..0000000
--- a/system_test/mirror_maker/config/server_source_1_1.properties
+++ /dev/null
@@ -1,76 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# see kafka.server.KafkaConfig for additional details and defaults
-
-# the id of the broker
-broker.id=1
-
-# hostname of broker. If not set, will pick up from the value returned
-# from getLocalHost. If there are multiple interfaces getLocalHost
-# may not be what you want.
-# host.name=
-
-# number of logical partitions on this broker
-num.partitions=1
-
-# the port the socket server runs on
-port=9090
-
-# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
-num.threads=8
-
-# the directory in which to store log files
-log.dir=/tmp/kafka-source-1-1-logs
-
-# the send buffer used by the socket server
-socket.send.buffer.bytes=1048576
-
-# the receive buffer used by the socket server
-socket.receive.buffer.bytes=1048576
-
-# the maximum size of a log segment
-log.segment.bytes=10000000
-
-# the interval between running cleanup on the logs
-log.cleanup.interval.mins=1
-
-# the minimum age of a log file to eligible for deletion
-log.retention.hours=168
-
-#the number of messages to accept without flushing the log to disk
-log.flush.interval.messages=600
-
-#set the following properties to use zookeeper
-
-# enable connecting to zookeeper
-enable.zookeeper=true
-
-# zk connection string
-# comma separated host:port pairs, each corresponding to a zk
-# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
-zk.connect=localhost:2181
-
-# timeout in ms for connecting to zookeeper
-zk.connection.timeout.ms=1000000
-
-# time based topic flush intervals in ms
-#log.flush.intervals.ms.per.topic=topic:1000
-
-# default time based flush interval in ms
-log.flush.interval.ms=1000
-
-# time based topic flasher time rate in ms
-log.flush.scheduler.interval.ms=1000
-
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/mirror_maker/config/server_source_1_2.properties
----------------------------------------------------------------------
diff --git a/system_test/mirror_maker/config/server_source_1_2.properties b/system_test/mirror_maker/config/server_source_1_2.properties
deleted file mode 100644
index f9353e8..0000000
--- a/system_test/mirror_maker/config/server_source_1_2.properties
+++ /dev/null
@@ -1,76 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# see kafka.server.KafkaConfig for additional details and defaults
-
-# the id of the broker
-broker.id=2
-
-# hostname of broker. If not set, will pick up from the value returned
-# from getLocalHost. If there are multiple interfaces getLocalHost
-# may not be what you want.
-# host.name=
-
-# number of logical partitions on this broker
-num.partitions=1
-
-# the port the socket server runs on
-port=9091
-
-# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
-num.threads=8
-
-# the directory in which to store log files
-log.dir=/tmp/kafka-source-1-2-logs
-
-# the send buffer used by the socket server
-socket.send.buffer.bytes=1048576
-
-# the receive buffer used by the socket server
-socket.receive.buffer.bytes=1048576
-
-# the maximum size of a log segment
-log.segment.bytes=536870912
-
-# the interval between running cleanup on the logs
-log.cleanup.interval.mins=1
-
-# the minimum age of a log file to eligible for deletion
-log.retention.hours=168
-
-#the number of messages to accept without flushing the log to disk
-log.flush.interval.messages=600
-
-#set the following properties to use zookeeper
-
-# enable connecting to zookeeper
-enable.zookeeper=true
-
-# zk connection string
-# comma separated host:port pairs, each corresponding to a zk
-# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
-zk.connect=localhost:2181
-
-# timeout in ms for connecting to zookeeper
-zk.connection.timeout.ms=1000000
-
-# time based topic flush intervals in ms
-#log.flush.intervals.ms.per.topic=topic:1000
-
-# default time based flush interval in ms
-log.flush.interval.ms=1000
-
-# time based topic flasher time rate in ms
-log.flush.scheduler.interval.ms=1000
-
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/mirror_maker/config/server_source_2_1.properties
----------------------------------------------------------------------
diff --git a/system_test/mirror_maker/config/server_source_2_1.properties b/system_test/mirror_maker/config/server_source_2_1.properties
deleted file mode 100644
index daa01ad..0000000
--- a/system_test/mirror_maker/config/server_source_2_1.properties
+++ /dev/null
@@ -1,76 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# see kafka.server.KafkaConfig for additional details and defaults
-
-# the id of the broker
-broker.id=1
-
-# hostname of broker. If not set, will pick up from the value returned
-# from getLocalHost. If there are multiple interfaces getLocalHost
-# may not be what you want.
-# host.name=
-
-# number of logical partitions on this broker
-num.partitions=1
-
-# the port the socket server runs on
-port=9092
-
-# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
-num.threads=8
-
-# the directory in which to store log files
-log.dir=/tmp/kafka-source-2-1-logs
-
-# the send buffer used by the socket server
-socket.send.buffer.bytes=1048576
-
-# the receive buffer used by the socket server
-socket.receive.buffer.bytes=1048576
-
-# the maximum size of a log segment
-log.segment.bytes=536870912
-
-# the interval between running cleanup on the logs
-log.cleanup.interval.mins=1
-
-# the minimum age of a log file to eligible for deletion
-log.retention.hours=168
-
-#the number of messages to accept without flushing the log to disk
-log.flush.interval.messages=600
-
-#set the following properties to use zookeeper
-
-# enable connecting to zookeeper
-enable.zookeeper=true
-
-# zk connection string
-# comma separated host:port pairs, each corresponding to a zk
-# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
-zk.connect=localhost:2182
-
-# timeout in ms for connecting to zookeeper
-zk.connection.timeout.ms=1000000
-
-# time based topic flush intervals in ms
-#log.flush.intervals.ms.per.topic=topic:1000
-
-# default time based flush interval in ms
-log.flush.interval.ms=1000
-
-# time based topic flasher time rate in ms
-log.flush.scheduler.interval.ms=1000
-
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/mirror_maker/config/server_source_2_2.properties
----------------------------------------------------------------------
diff --git a/system_test/mirror_maker/config/server_source_2_2.properties b/system_test/mirror_maker/config/server_source_2_2.properties
deleted file mode 100644
index be6fdfc..0000000
--- a/system_test/mirror_maker/config/server_source_2_2.properties
+++ /dev/null
@@ -1,76 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# see kafka.server.KafkaConfig for additional details and defaults
-
-# the id of the broker
-broker.id=2
-
-# hostname of broker. If not set, will pick up from the value returned
-# from getLocalHost. If there are multiple interfaces getLocalHost
-# may not be what you want.
-# host.name=
-
-# number of logical partitions on this broker
-num.partitions=1
-
-# the port the socket server runs on
-port=9093
-
-# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
-num.threads=8
-
-# the directory in which to store log files
-log.dir=/tmp/kafka-source-2-2-logs
-
-# the send buffer used by the socket server
-socket.send.buffer.bytes=1048576
-
-# the receive buffer used by the socket server
-socket.receive.buffer.bytes=1048576
-
-# the maximum size of a log segment
-log.segment.bytes=536870912
-
-# the interval between running cleanup on the logs
-log.cleanup.interval.mins=1
-
-# the minimum age of a log file to eligible for deletion
-log.retention.hours=168
-
-#the number of messages to accept without flushing the log to disk
-log.flush.interval.messages=600
-
-#set the following properties to use zookeeper
-
-# enable connecting to zookeeper
-enable.zookeeper=true
-
-# zk connection string
-# comma separated host:port pairs, each corresponding to a zk
-# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
-zk.connect=localhost:2182
-
-# timeout in ms for connecting to zookeeper
-zk.connection.timeout.ms=1000000
-
-# time based topic flush intervals in ms
-#log.flush.intervals.ms.per.topic=topic:1000
-
-# default time based flush interval in ms
-log.flush.interval.ms=1000
-
-# time based topic flasher time rate in ms
-log.flush.scheduler.interval.ms=1000
-
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/mirror_maker/config/server_target_1_1.properties
----------------------------------------------------------------------
diff --git a/system_test/mirror_maker/config/server_target_1_1.properties b/system_test/mirror_maker/config/server_target_1_1.properties
deleted file mode 100644
index d37955a..0000000
--- a/system_test/mirror_maker/config/server_target_1_1.properties
+++ /dev/null
@@ -1,78 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# see kafka.server.KafkaConfig for additional details and defaults
-
-# the id of the broker
-broker.id=1
-
-# hostname of broker. If not set, will pick up from the value returned
-# from getLocalHost. If there are multiple interfaces getLocalHost
-# may not be what you want.
-# host.name=
-
-# number of logical partitions on this broker
-num.partitions=1
-
-# the port the socket server runs on
-port=9094
-
-# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
-num.threads=8
-
-# the directory in which to store log files
-log.dir=/tmp/kafka-target-1-1-logs
-
-# the send buffer used by the socket server
-socket.send.buffer.bytes=1048576
-
-# the receive buffer used by the socket server
-socket.receive.buffer.bytes=1048576
-
-# the maximum size of a log segment
-log.segment.bytes=536870912
-
-# the interval between running cleanup on the logs
-log.cleanup.interval.mins=1
-
-# the minimum age of a log file to eligible for deletion
-log.retention.hours=168
-
-#the number of messages to accept without flushing the log to disk
-log.flush.interval.messages=600
-
-#set the following properties to use zookeeper
-
-# enable connecting to zookeeper
-enable.zookeeper=true
-
-# zk connection string
-# comma separated host:port pairs, each corresponding to a zk
-# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
-zk.connect=localhost:2183
-
-# timeout in ms for connecting to zookeeper
-zk.connection.timeout.ms=1000000
-
-# time based topic flush intervals in ms
-#log.flush.intervals.ms.per.topic=topic:1000
-
-# default time based flush interval in ms
-log.flush.interval.ms=1000
-
-# time based topic flasher time rate in ms
-log.flush.scheduler.interval.ms=1000
-
-# topic partition count map
-# topic.partition.count.map=topic1:3, topic2:4
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/mirror_maker/config/server_target_1_2.properties
----------------------------------------------------------------------
diff --git a/system_test/mirror_maker/config/server_target_1_2.properties b/system_test/mirror_maker/config/server_target_1_2.properties
deleted file mode 100644
index aa7546c..0000000
--- a/system_test/mirror_maker/config/server_target_1_2.properties
+++ /dev/null
@@ -1,78 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# see kafka.server.KafkaConfig for additional details and defaults
-
-# the id of the broker
-broker.id=2
-
-# hostname of broker. If not set, will pick up from the value returned
-# from getLocalHost. If there are multiple interfaces getLocalHost
-# may not be what you want.
-# host.name=
-
-# number of logical partitions on this broker
-num.partitions=1
-
-# the port the socket server runs on
-port=9095
-
-# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
-num.threads=8
-
-# the directory in which to store log files
-log.dir=/tmp/kafka-target-1-2-logs
-
-# the send buffer used by the socket server
-socket.send.buffer.bytes=1048576
-
-# the receive buffer used by the socket server
-socket.receive.buffer.bytes=1048576
-
-# the maximum size of a log segment
-log.segment.bytes=536870912
-
-# the interval between running cleanup on the logs
-log.cleanup.interval.mins=1
-
-# the minimum age of a log file to eligible for deletion
-log.retention.hours=168
-
-#the number of messages to accept without flushing the log to disk
-log.flush.interval.messages=600
-
-#set the following properties to use zookeeper
-
-# enable connecting to zookeeper
-enable.zookeeper=true
-
-# zk connection string
-# comma separated host:port pairs, each corresponding to a zk
-# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
-zk.connect=localhost:2183
-
-# timeout in ms for connecting to zookeeper
-zk.connection.timeout.ms=1000000
-
-# time based topic flush intervals in ms
-#log.flush.intervals.ms.per.topic=topic:1000
-
-# default time based flush interval in ms
-log.flush.interval.ms=1000
-
-# time based topic flasher time rate in ms
-log.flush.scheduler.interval.ms=1000
-
-# topic partition count map
-# topic.partition.count.map=topic1:3, topic2:4
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/mirror_maker/config/whitelisttest_1.consumer.properties
----------------------------------------------------------------------
diff --git a/system_test/mirror_maker/config/whitelisttest_1.consumer.properties b/system_test/mirror_maker/config/whitelisttest_1.consumer.properties
deleted file mode 100644
index ff12015..0000000
--- a/system_test/mirror_maker/config/whitelisttest_1.consumer.properties
+++ /dev/null
@@ -1,28 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# see kafka.consumer.ConsumerConfig for more details
-
-# zk connection string
-# comma separated host:port pairs, each corresponding to a zk
-# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
-zk.connect=localhost:2181
-
-# timeout in ms for connecting to zookeeper
-zk.connection.timeout.ms=1000000
-
-#consumer group id
-group.id=group1
-shallow.iterator.enable=true
-