You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/03/07 04:07:09 UTC

[3/3] git commit: kafka-1240; Add ability to existing system tests to use the new producer client; patched by Jun Rao; reviewed by Neha Narkhede

kafka-1240; Add ability to existing system tests to use the new producer client; patched by Jun Rao; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c765d7bd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c765d7bd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c765d7bd

Branch: refs/heads/trunk
Commit: c765d7bd4e30e0b952fc4bc00d142f7939b498a6
Parents: 74c54c7
Author: Jun Rao <ju...@gmail.com>
Authored: Thu Mar 6 19:06:25 2014 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Mar 6 19:06:25 2014 -0800

----------------------------------------------------------------------
 .../kafka/tools/newproducer/MirrorMaker.scala   |   4 +-
 core/src/main/scala/kafka/utils/Utils.scala     |  23 ++-
 .../scala/kafka/perf/ProducerPerformance.scala  |  27 ++-
 system_test/README.txt                          |   5 +-
 .../migration_tool_test.py                      |   1 +
 .../config/mirror_producer.properties           |   5 +-
 .../mirror_maker_testsuite/mirror_maker_test.py |   1 +
 .../testcase_5001_properties.json               | 158 ++++++++++++++++
 .../testcase_5002_properties.json               | 158 ++++++++++++++++
 .../testcase_15003/cluster_config.json          | 135 ++++++++++++++
 .../testcase_5003_properties.json               | 156 ++++++++++++++++
 .../testcase_15004/cluster_config.json          | 135 ++++++++++++++
 .../testcase_5004_properties.json               | 156 ++++++++++++++++
 .../testcase_15005/cluster_config.json          | 153 ++++++++++++++++
 .../testcase_5005_properties.json               | 178 +++++++++++++++++++
 .../testcase_15006/cluster_config.json          | 153 ++++++++++++++++
 .../testcase_5006_properties.json               | 178 +++++++++++++++++++
 .../testcase_5001/testcase_5001_properties.json |   2 +
 .../testcase_5002/testcase_5002_properties.json |   2 +
 .../testcase_5003/testcase_5003_properties.json |   3 +
 .../testcase_5004/testcase_5004_properties.json |   3 +
 .../testcase_5005/testcase_5005_properties.json |   4 +
 .../testcase_5006/testcase_5006_properties.json |   4 +
 .../replication_testsuite/replica_basic_test.py |   1 +
 .../testcase_0001/testcase_0001_properties.json |   1 +
 .../testcase_0002/testcase_0002_properties.json |   1 +
 .../testcase_0003/testcase_0003_properties.json |   1 +
 .../testcase_0004/testcase_0004_properties.json |   1 +
 .../testcase_0005/testcase_0005_properties.json |   1 +
 .../testcase_0006/testcase_0006_properties.json |   1 +
 .../testcase_0007/testcase_0007_properties.json |   1 +
 .../testcase_0008/testcase_0008_properties.json |   1 +
 .../testcase_0009/testcase_0009_properties.json |   1 +
 .../testcase_0010/testcase_0010_properties.json |   1 +
 .../testcase_0011/testcase_0011_properties.json |   1 +
 .../testcase_0021/testcase_0021_properties.json |   2 +
 .../testcase_0022/testcase_0022_properties.json |   2 +
 .../testcase_0023/testcase_0023_properties.json |   2 +
 .../testcase_0024/testcase_0024_properties.json |   1 +
 .../testcase_0101/testcase_0101_properties.json |   1 +
 .../testcase_0102/testcase_0102_properties.json |   1 +
 .../testcase_0103/testcase_0103_properties.json |   1 +
 .../testcase_0104/testcase_0104_properties.json |   1 +
 .../testcase_0105/testcase_0105_properties.json |   1 +
 .../testcase_0106/testcase_0106_properties.json |   1 +
 .../testcase_0107/testcase_0107_properties.json |   1 +
 .../testcase_0108/testcase_0108_properties.json |   1 +
 .../testcase_0109/testcase_0109_properties.json |   1 +
 .../testcase_0110/testcase_0110_properties.json |   1 +
 .../testcase_0111/testcase_0111_properties.json |   1 +
 .../testcase_0112/testcase_0112_properties.json |   1 +
 .../testcase_0113/testcase_0113_properties.json |   1 +
 .../testcase_0114/testcase_0114_properties.json |   1 +
 .../testcase_0115/testcase_0115_properties.json |   1 +
 .../testcase_0116/testcase_0116_properties.json |   1 +
 .../testcase_0117/testcase_0117_properties.json |   1 +
 .../testcase_0118/testcase_0118_properties.json |   1 +
 .../testcase_0119/testcase_0119_properties.json |   1 +
 .../testcase_0121/testcase_0121_properties.json |   2 +
 .../testcase_0122/testcase_0122_properties.json |   2 +
 .../testcase_0123/testcase_0123_properties.json |   2 +
 .../testcase_0124/testcase_0124_properties.json |   2 +
 .../testcase_0125/testcase_0125_properties.json |   2 +
 .../testcase_0126/testcase_0126_properties.json |   2 +
 .../testcase_0127/testcase_0127_properties.json |   2 +
 .../testcase_0128/testcase_0128_properties.json |   1 +
 .../testcase_0131/testcase_0131_properties.json |   2 +
 .../testcase_0132/testcase_0132_properties.json |   2 +
 .../testcase_0133/testcase_0133_properties.json |   2 +
 .../testcase_0134/testcase_0134_properties.json |   1 +
 .../testcase_0151/testcase_0151_properties.json |   1 +
 .../testcase_0152/testcase_0152_properties.json |   1 +
 .../testcase_0153/testcase_0153_properties.json |   1 +
 .../testcase_0154/testcase_0154_properties.json |   1 +
 .../testcase_0155/testcase_0155_properties.json |   1 +
 .../testcase_0156/testcase_0156_properties.json |   1 +
 .../testcase_0157/testcase_0157_properties.json |   1 +
 .../testcase_0158/testcase_0158_properties.json |   1 +
 .../testcase_0159/testcase_0159_properties.json |   1 +
 .../testcase_0201/testcase_0201_properties.json |   1 +
 .../testcase_0202/testcase_0202_properties.json |   1 +
 .../testcase_0203/testcase_0203_properties.json |   1 +
 .../testcase_0204/testcase_0204_properties.json |   1 +
 .../testcase_0205/testcase_0205_properties.json |   1 +
 .../testcase_0206/testcase_0206_properties.json |   1 +
 .../testcase_0207/testcase_0207_properties.json |   1 +
 .../testcase_0208/testcase_0208_properties.json |   1 +
 .../testcase_0209/testcase_0209_properties.json |   1 +
 .../testcase_0251/testcase_0251_properties.json |   1 +
 .../testcase_0252/testcase_0252_properties.json |   1 +
 .../testcase_0253/testcase_0253_properties.json |   1 +
 .../testcase_0254/testcase_0254_properties.json |   1 +
 .../testcase_0255/testcase_0255_properties.json |   1 +
 .../testcase_0256/testcase_0256_properties.json |   1 +
 .../testcase_0257/testcase_0257_properties.json |   1 +
 .../testcase_0258/testcase_0258_properties.json |   1 +
 .../testcase_0259/testcase_0259_properties.json |   1 +
 .../testcase_0301/testcase_0301_properties.json |   1 +
 .../testcase_0302/testcase_0302_properties.json |   1 +
 .../testcase_0303/testcase_0303_properties.json |   1 +
 .../testcase_0304/testcase_0304_properties.json |   1 +
 .../testcase_0305/testcase_0305_properties.json |   1 +
 .../testcase_0306/testcase_0306_properties.json |   1 +
 .../testcase_0307/testcase_0307_properties.json |   1 +
 .../testcase_0308/testcase_0308_properties.json |   1 +
 .../testcase_0309/testcase_0309_properties.json |   1 +
 .../testcase_1/testcase_1_properties.json       |   1 +
 .../testcase_0101_properties.json               |  86 +++++++++
 .../testcase_0102_properties.json               |  86 +++++++++
 .../testcase_0103_properties.json               |  86 +++++++++
 .../testcase_0104_properties.json               |  86 +++++++++
 .../testcase_0105_properties.json               |  86 +++++++++
 .../testcase_0106_properties.json               |  86 +++++++++
 .../testcase_0107_properties.json               |  86 +++++++++
 .../testcase_0108_properties.json               |  86 +++++++++
 .../testcase_0109_properties.json               |  86 +++++++++
 .../testcase_0110_properties.json               |  86 +++++++++
 .../testcase_10131/cluster_config.json          |  76 ++++++++
 .../testcase_0131_properties.json               | 110 ++++++++++++
 .../testcase_10132/cluster_config.json          |  76 ++++++++
 .../testcase_0132_properties.json               | 107 +++++++++++
 .../testcase_10133/cluster_config.json          |  76 ++++++++
 .../testcase_0133_properties.json               | 107 +++++++++++
 .../testcase_0134_properties.json               |  92 ++++++++++
 .../testcase_4001/testcase_4001_properties.json |   2 +
 .../testcase_4002/testcase_4002_properties.json |   2 +
 .../testcase_4003/testcase_4003_properties.json |   2 +
 .../testcase_4004/testcase_4004_properties.json |   2 +
 .../testcase_4005/testcase_4005_properties.json |   2 +
 .../testcase_4006/testcase_4006_properties.json |   2 +
 .../testcase_4007/testcase_4007_properties.json |   2 +
 .../testcase_4008/testcase_4008_properties.json |   2 +
 .../testcase_4011/testcase_4011_properties.json |   2 +
 .../testcase_4012/testcase_4012_properties.json |   2 +
 .../testcase_4013/testcase_4013_properties.json |   2 +
 .../testcase_4014/testcase_4014_properties.json |   2 +
 .../testcase_4015/testcase_4015_properties.json |   2 +
 .../testcase_4016/testcase_4016_properties.json |   2 +
 .../testcase_4017/testcase_4017_properties.json |   2 +
 .../testcase_4018/testcase_4018_properties.json |   2 +
 .../testcase_9051/testcase_9051_properties.json |   1 +
 system_test/testcase_to_run_all.json            |  26 ++-
 system_test/utils/kafka_system_test_utils.py    |  34 +++-
 143 files changed, 3314 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/core/src/main/scala/kafka/tools/newproducer/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/newproducer/MirrorMaker.scala b/core/src/main/scala/kafka/tools/newproducer/MirrorMaker.scala
index d23ef9a..d1dc13b 100644
--- a/core/src/main/scala/kafka/tools/newproducer/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/newproducer/MirrorMaker.scala
@@ -168,10 +168,10 @@ object MirrorMaker extends Logging {
         val producerId = Utils.abs(java.util.Arrays.hashCode(producerRecord.key())) % producers.size
         trace("Send message with key %s to producer %d.".format(java.util.Arrays.toString(producerRecord.key()), producerId))
         val producer = producers(producerId)
-        producer.send(producerRecord)
+        producer.send(producerRecord, Utils.errorLoggingCallback(producerRecord.key(), producerRecord.value()))
       } else {
         val producerId = producerIndex.getAndSet((producerIndex.get() + 1) % producers.size)
-        producers(producerId).send(producerRecord)
+        producers(producerId).send(producerRecord, Utils.errorLoggingCallback(producerRecord.key(), producerRecord.value()))
         trace("Sent message to producer " + producerId)
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/core/src/main/scala/kafka/utils/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala
index a89b046..33e05f0 100644
--- a/core/src/main/scala/kafka/utils/Utils.scala
+++ b/core/src/main/scala/kafka/utils/Utils.scala
@@ -30,6 +30,7 @@ import scala.collection.mutable
 import java.util.Properties
 import kafka.common.KafkaException
 import kafka.common.KafkaStorageException
+import org.apache.kafka.clients.producer.{RecordMetadata, Callback}
 
 
 /**
@@ -540,5 +541,25 @@ object Utils extends Logging {
       lock.unlock()
     }
   }
-  
+
+  def errorLoggingCallback(key: Array[Byte], value: Array[Byte], logAsString: Boolean = false) = {
+    new Callback() {
+      def onCompletion(metadata: RecordMetadata, e: Exception) {
+        if (e != null) {
+          val keyString = if (key == null)
+                            "null"
+                          else {
+                            if (logAsString) new String(key) else key.length + " bytes"
+                          }
+          val valueString = if (value == null)
+                            "null"
+                          else {
+                            if (logAsString) new String(value) else value.length + " bytes"
+                          }
+          error("Error when sending message with key: " + keyString + ", value: " + valueString +
+                " with exception " + e.getMessage)
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
----------------------------------------------------------------------
diff --git a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
index 5d399d9..f061dba 100644
--- a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
+++ b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
@@ -21,15 +21,14 @@ import java.util.concurrent.{ CountDownLatch, Executors }
 import java.util.concurrent.atomic.AtomicLong
 import kafka.producer._
 import org.apache.log4j.Logger
-import kafka.message.{ CompressionCodec, Message }
+import kafka.message.CompressionCodec
 import java.text.SimpleDateFormat
 import kafka.serializer._
 import java.util._
 import collection.immutable.List
 import kafka.utils.{ VerifiableProperties, Logging, Utils }
 import kafka.metrics.KafkaMetricsReporter
-import org.apache.kafka.clients.producer.KafkaProducer
-import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 
 /**
  * Load test for the producer
@@ -210,11 +209,18 @@ object ProducerPerformance extends Logging {
     props.put("client.id", "perf-test")
     props.put("request.required.acks", config.producerRequestRequiredAcks.toString)
     props.put("request.timeout.ms", config.producerRequestTimeoutMs.toString)
+    props.put("request.retries", config.producerNumRetries.toString)
+    props.put("retry.backoff.ms", config.producerRetryBackoffMs.toString)
     val producer = new KafkaProducer(props)
 
     def send(topic: String, partition: Long, bytes: Array[Byte]) {
       val part = partition % this.producer.partitionsFor(topic).size
-      this.producer.send(new ProducerRecord(topic, Utils.abs(part.toInt), null, bytes))
+      if (config.isSync) {
+        this.producer.send(new ProducerRecord(topic, Utils.abs(part.toInt), null, bytes)).get()
+      } else {
+        this.producer.send(new ProducerRecord(topic, Utils.abs(part.toInt), null, bytes),
+                           Utils.errorLoggingCallback(null, bytes, if (config.seqIdMode) true else false))
+      }
     }
 
     def close() {
@@ -280,21 +286,28 @@ object ProducerPerformance extends Logging {
       var bytesSent = 0L
       var nSends = 0
       var j: Long = 0L
+      var message: Array[Byte] = null
+
       while (j < messagesPerThread) {
         try {
           config.topics.foreach(
             topic => {
-              producer.send(topic, j, generateProducerData(topic, j))
+              message = generateProducerData(topic, j)
+              producer.send(topic, j, message)
               nSends += 1
               if (config.messageSendGapMs > 0)
                 Thread.sleep(config.messageSendGapMs)
             })
         } catch {
-          case e: Exception => error("Error sending messages", e)
+          case e: Throwable => error("Error when sending message " + new String(message), e)
         }
         j += 1
       }
-      producer.close()
+      try {
+        producer.close()
+      } catch {
+        case e: Throwable => error("Error when closing producer", e)
+      }
       totalBytesSent.addAndGet(bytesSent)
       totalMessagesSent.addAndGet(nSends)
       allDone.countDown()

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/README.txt
----------------------------------------------------------------------
diff --git a/system_test/README.txt b/system_test/README.txt
index d89ad9d..87937ec 100644
--- a/system_test/README.txt
+++ b/system_test/README.txt
@@ -76,4 +76,7 @@ To create a new test case under "replication_testsuite", please do the following
   2. Rename system_test/replication_testsuite/testcase_2/testcase_1_properties.json => system_test/replication_testsuite/testcase_2/testcase_2_properties.json
   3. Update system_test/replication_testsuite/testcase_2/testcase_2_properties.json with the corresponding settings for testcase 2.
 
-
+Note:
+The following testcases are for the old producer and the old mirror maker. We can remove them once we phase out the old producer client.
+  replication_testsuite: testcase_{10101 - 10110} testcase_{10131 - 10134}
+  mirror_maker_testsuite: testcase_{15001 - 15006}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/migration_tool_testsuite/migration_tool_test.py
----------------------------------------------------------------------
diff --git a/system_test/migration_tool_testsuite/migration_tool_test.py b/system_test/migration_tool_testsuite/migration_tool_test.py
index ce6f4f6..2fecd19 100644
--- a/system_test/migration_tool_testsuite/migration_tool_test.py
+++ b/system_test/migration_tool_testsuite/migration_tool_test.py
@@ -247,6 +247,7 @@ class MigrationToolTest(ReplicationUtils, SetupUtils):
                         str(self.testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"]) + "]", extra=self.d)
                     if self.testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"]:
                         time.sleep(1)
+                        self.testcaseEnv.lock.release()
                         self.logger.info("all producer threads completed", extra=self.d)
                         break
                     time.sleep(1)

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/mirror_maker_testsuite/config/mirror_producer.properties
----------------------------------------------------------------------
diff --git a/system_test/mirror_maker_testsuite/config/mirror_producer.properties b/system_test/mirror_maker_testsuite/config/mirror_producer.properties
index b2bf2c2..4391bc8 100644
--- a/system_test/mirror_maker_testsuite/config/mirror_producer.properties
+++ b/system_test/mirror_maker_testsuite/config/mirror_producer.properties
@@ -1,6 +1,5 @@
-producer.type=async
-queue.enqueue.timeout.ms=-1
+block.on.buffer.full=true
 metadata.broker.list=localhost:9094
 compression.codec=0
-message.send.max.retries=3
+request.retries=3
 request.required.acks=1

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/mirror_maker_testsuite/mirror_maker_test.py
----------------------------------------------------------------------
diff --git a/system_test/mirror_maker_testsuite/mirror_maker_test.py b/system_test/mirror_maker_testsuite/mirror_maker_test.py
index dfffb4e..fd18088 100644
--- a/system_test/mirror_maker_testsuite/mirror_maker_test.py
+++ b/system_test/mirror_maker_testsuite/mirror_maker_test.py
@@ -248,6 +248,7 @@ class MirrorMakerTest(ReplicationUtils, SetupUtils):
                         str(self.testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"]) + "]", extra=self.d)
                     if self.testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"]:
                         time.sleep(1)
+                        self.testcaseEnv.lock.release()
                         self.logger.info("all producer threads completed", extra=self.d)
                         break
                     time.sleep(1)

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/mirror_maker_testsuite/testcase_15001/testcase_5001_properties.json
----------------------------------------------------------------------
diff --git a/system_test/mirror_maker_testsuite/testcase_15001/testcase_5001_properties.json b/system_test/mirror_maker_testsuite/testcase_15001/testcase_5001_properties.json
new file mode 100644
index 0000000..287cab9
--- /dev/null
+++ b/system_test/mirror_maker_testsuite/testcase_15001/testcase_5001_properties.json
@@ -0,0 +1,158 @@
+{
+  "description": {"01":"To Test : 'Replication with Mirror Maker'",
+                  "02":"Set up 2 clusters such as : SOURCE => MirrorMaker => TARGET",
+                  "03":"Set up 2-node Zk cluster for both SOURCE & TARGET",
+                  "04":"Produce and consume messages to a single topic - single partition.",
+                  "05":"This test sends messages to 3 replicas",
+                  "06":"At the end it verifies the log size and contents",
+                  "07":"Use a consumer to verify no message loss in TARGET cluster.",
+                  "08":"Producer dimensions : mode:sync, acks:-1, comp:0",
+                  "09":"Log segment size    : 10240"
+  },
+  "testcase_args": {
+    "bounce_leader": "false",
+    "bounce_mirror_maker": "false",
+    "replica_factor": "3",
+    "num_partition": "1",
+    "num_iteration": "1",
+    "sleep_seconds_between_producer_calls": "1",
+    "message_producing_free_time_sec": "15",
+    "num_messages_to_produce_per_producer_call": "50"
+  },
+  "entities": [
+    {
+      "entity_id": "0",
+      "clientPort": "2108",
+      "dataDir": "/tmp/zookeeper_0",
+      "log_filename": "zookeeper_0.log",
+      "config_filename": "zookeeper_0.properties"
+    },
+    {
+      "entity_id": "1",
+      "clientPort": "2118",
+      "dataDir": "/tmp/zookeeper_1",
+      "log_filename": "zookeeper_1.log",
+      "config_filename": "zookeeper_1.properties"
+    },
+
+    {
+      "entity_id": "2",
+      "clientPort": "2128",
+      "dataDir": "/tmp/zookeeper_2",
+      "log_filename": "zookeeper_2.log",
+      "config_filename": "zookeeper_2.properties"
+    },
+    {
+      "entity_id": "3",
+      "clientPort": "2138",
+      "dataDir": "/tmp/zookeeper_3",
+      "log_filename": "zookeeper_3.log",
+      "config_filename": "zookeeper_3.properties"
+    },
+
+    {
+      "entity_id": "4",
+      "port": "9091",
+      "broker.id": "1",
+      "log.segment.bytes": "10240",
+      "log.dir": "/tmp/kafka_server_4_logs",
+      "default.replication.factor": "3",
+      "num.partitions": "5",
+      "log_filename": "kafka_server_4.log",
+      "config_filename": "kafka_server_4.properties"
+    },
+    {
+      "entity_id": "5",
+      "port": "9092",
+      "broker.id": "2",
+      "log.segment.bytes": "10240",
+      "log.dir": "/tmp/kafka_server_5_logs",
+      "default.replication.factor": "3",
+      "num.partitions": "5",
+      "log_filename": "kafka_server_5.log",
+      "config_filename": "kafka_server_5.properties"
+    },
+    {
+      "entity_id": "6",
+      "port": "9093",
+      "broker.id": "3",
+      "log.segment.bytes": "10240",
+      "log.dir": "/tmp/kafka_server_6_logs",
+      "default.replication.factor": "3",
+      "num.partitions": "5",
+      "log_filename": "kafka_server_6.log",
+      "config_filename": "kafka_server_6.properties"
+    },
+    {
+      "entity_id": "7",
+      "port": "9094",
+      "broker.id": "4",
+      "log.segment.bytes": "10240",
+      "log.dir": "/tmp/kafka_server_7_logs",
+      "default.replication.factor": "3",
+      "num.partitions": "5",
+      "log_filename": "kafka_server_7.log",
+      "config_filename": "kafka_server_7.properties"
+    },
+    {
+      "entity_id": "8",
+      "port": "9095",
+      "broker.id": "5",
+      "log.segment.bytes": "10240",
+      "log.dir": "/tmp/kafka_server_8_logs",
+      "default.replication.factor": "3",
+      "num.partitions": "5",
+      "log_filename": "kafka_server_8.log",
+      "config_filename": "kafka_server_8.properties"
+    },
+    {
+      "entity_id": "9",
+      "port": "9096",
+      "broker.id": "6",
+      "log.segment.bytes": "10240",
+      "log.dir": "/tmp/kafka_server_9_logs",
+      "default.replication.factor": "3",
+      "num.partitions": "5",
+      "log_filename": "kafka_server_9.log",
+      "config_filename": "kafka_server_9.properties"
+    },
+
+    {
+      "entity_id": "10",
+      "topic": "test_1",
+      "threads": "5",
+      "compression-codec": "0",
+      "message-size": "500",
+      "message": "500",
+      "request-num-acks": "-1",
+      "sync":"true",
+      "producer-num-retries":"5",
+      "log_filename": "producer_performance_10.log",
+      "config_filename": "producer_performance_10.properties"
+    },
+    {
+      "entity_id": "11",
+      "topic": "test_1",
+      "group.id": "mytestgroup",
+      "consumer-timeout-ms": "10000",
+      "log_filename": "console_consumer_11.log",
+      "config_filename": "console_consumer_11.properties"
+    },
+
+    {
+      "entity_id": "12",
+      "log_filename": "mirror_maker_12.log",
+      "mirror_consumer_config_filename": "mirror_consumer_12.properties",
+      "mirror_producer_config_filename": "mirror_producer_12.properties"
+    },
+
+    {
+      "entity_id": "13",
+      "topic": "test_1",
+      "group.id": "mytestgroup",
+      "consumer-timeout-ms": "10000",
+      "log_filename": "console_consumer_13.log",
+      "config_filename": "console_consumer_13.properties"
+    }
+   ]
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/mirror_maker_testsuite/testcase_15002/testcase_5002_properties.json
----------------------------------------------------------------------
diff --git a/system_test/mirror_maker_testsuite/testcase_15002/testcase_5002_properties.json b/system_test/mirror_maker_testsuite/testcase_15002/testcase_5002_properties.json
new file mode 100644
index 0000000..5457eb1
--- /dev/null
+++ b/system_test/mirror_maker_testsuite/testcase_15002/testcase_5002_properties.json
@@ -0,0 +1,158 @@
+{
+  "description": {"01":"Replication with Mirror Maker => Bounce Mirror Maker",
+                  "02":"Set up 2 clusters such as : SOURCE => MirrorMaker => TARGET",
+                  "03":"Set up 2-node Zk cluster for both SOURCE & TARGET",
+                  "04":"Produce and consume messages to a single topic - single partition.",
+                  "05":"This test sends messages to 3 replicas",
+                  "06":"At the end it verifies the log size and contents",
+                  "07":"Use a consumer to verify no message loss in TARGET cluster.",
+                  "08":"Producer dimensions : mode:sync, acks:-1, comp:0",
+                  "09":"Log segment size    : 10240"
+  },
+  "testcase_args": {
+    "bounce_leader": "false",
+    "bounce_mirror_maker": "true",
+    "replica_factor": "3",
+    "num_partition": "1",
+    "num_iteration": "1",
+    "sleep_seconds_between_producer_calls": "1",
+    "message_producing_free_time_sec": "15",
+    "num_messages_to_produce_per_producer_call": "50"
+  },
+  "entities": [
+    {
+      "entity_id": "0",
+      "clientPort": "2108",
+      "dataDir": "/tmp/zookeeper_0",
+      "log_filename": "zookeeper_0.log",
+      "config_filename": "zookeeper_0.properties"
+    },
+    {
+      "entity_id": "1",
+      "clientPort": "2118",
+      "dataDir": "/tmp/zookeeper_1",
+      "log_filename": "zookeeper_1.log",
+      "config_filename": "zookeeper_1.properties"
+    },
+
+    {
+      "entity_id": "2",
+      "clientPort": "2128",
+      "dataDir": "/tmp/zookeeper_2",
+      "log_filename": "zookeeper_2.log",
+      "config_filename": "zookeeper_2.properties"
+    },
+    {
+      "entity_id": "3",
+      "clientPort": "2138",
+      "dataDir": "/tmp/zookeeper_3",
+      "log_filename": "zookeeper_3.log",
+      "config_filename": "zookeeper_3.properties"
+    },
+
+    {
+      "entity_id": "4",
+      "port": "9091",
+      "broker.id": "1",
+      "log.segment.bytes": "10240",
+      "log.dir": "/tmp/kafka_server_4_logs",
+      "default.replication.factor": "3",
+      "num.partitions": "5",
+      "log_filename": "kafka_server_4.log",
+      "config_filename": "kafka_server_4.properties"
+    },
+    {
+      "entity_id": "5",
+      "port": "9092",
+      "broker.id": "2",
+      "log.segment.bytes": "10240",
+      "log.dir": "/tmp/kafka_server_5_logs",
+      "default.replication.factor": "3",
+      "num.partitions": "5",
+      "log_filename": "kafka_server_5.log",
+      "config_filename": "kafka_server_5.properties"
+    },
+    {
+      "entity_id": "6",
+      "port": "9093",
+      "broker.id": "3",
+      "log.segment.bytes": "10240",
+      "log.dir": "/tmp/kafka_server_6_logs",
+      "default.replication.factor": "3",
+      "num.partitions": "5",
+      "log_filename": "kafka_server_6.log",
+      "config_filename": "kafka_server_6.properties"
+    },
+    {
+      "entity_id": "7",
+      "port": "9094",
+      "broker.id": "4",
+      "log.segment.bytes": "10240",
+      "log.dir": "/tmp/kafka_server_7_logs",
+      "default.replication.factor": "3",
+      "num.partitions": "5",
+      "log_filename": "kafka_server_7.log",
+      "config_filename": "kafka_server_7.properties"
+    },
+    {
+      "entity_id": "8",
+      "port": "9095",
+      "broker.id": "5",
+      "log.segment.bytes": "10240",
+      "log.dir": "/tmp/kafka_server_8_logs",
+      "default.replication.factor": "3",
+      "num.partitions": "5",
+      "log_filename": "kafka_server_8.log",
+      "config_filename": "kafka_server_8.properties"
+    },
+    {
+      "entity_id": "9",
+      "port": "9096",
+      "broker.id": "6",
+      "log.segment.bytes": "10240",
+      "log.dir": "/tmp/kafka_server_9_logs",
+      "default.replication.factor": "3",
+      "num.partitions": "5",
+      "log_filename": "kafka_server_9.log",
+      "config_filename": "kafka_server_9.properties"
+    },
+
+    {
+      "entity_id": "10",
+      "topic": "test_1",
+      "threads": "5",
+      "compression-codec": "0",
+      "message-size": "500",
+      "message": "100",
+      "request-num-acks": "-1",
+      "sync":"true",
+      "producer-num-retries":"5",
+      "log_filename": "producer_performance_10.log",
+      "config_filename": "producer_performance_10.properties"
+    },
+    {
+      "entity_id": "11",
+      "topic": "test_1",
+      "group.id": "mytestgroup",
+      "consumer-timeout-ms": "10000",
+      "log_filename": "console_consumer_11.log",
+      "config_filename": "console_consumer_11.properties"
+    },
+
+    {
+      "entity_id": "12",
+      "log_filename": "mirror_maker_12.log",
+      "mirror_consumer_config_filename": "mirror_consumer_12.properties",
+      "mirror_producer_config_filename": "mirror_producer_12.properties"
+    },
+
+    {
+      "entity_id": "13",
+      "topic": "test_1",
+      "group.id": "mytestgroup",
+      "consumer-timeout-ms": "10000",
+      "log_filename": "console_consumer_13.log",
+      "config_filename": "console_consumer_13.properties"
+    }
+   ]
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/mirror_maker_testsuite/testcase_15003/cluster_config.json
----------------------------------------------------------------------
diff --git a/system_test/mirror_maker_testsuite/testcase_15003/cluster_config.json b/system_test/mirror_maker_testsuite/testcase_15003/cluster_config.json
new file mode 100644
index 0000000..f6fe867
--- /dev/null
+++ b/system_test/mirror_maker_testsuite/testcase_15003/cluster_config.json
@@ -0,0 +1,135 @@
+{
+    "cluster_config": [
+        {
+            "entity_id": "0",
+            "hostname": "localhost",
+            "role": "zookeeper",
+            "cluster_name":"source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9100"
+        },
+        {
+            "entity_id": "1",
+            "hostname": "localhost",
+            "role": "zookeeper",
+            "cluster_name":"source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9101"
+        },
+
+        {
+            "entity_id": "2",
+            "hostname": "localhost",
+            "role": "zookeeper",
+            "cluster_name":"target",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9102"
+        },
+        {
+            "entity_id": "3",
+            "hostname": "localhost",
+            "role": "zookeeper",
+            "cluster_name":"target",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9103"
+        },
+
+        {
+            "entity_id": "4",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name":"source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9104"
+        },
+        {
+            "entity_id": "5",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name":"source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9105"
+        },
+        {
+            "entity_id": "6",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name":"source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9106"
+        },
+
+        {
+            "entity_id": "7",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name":"target",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9107"
+        },
+        {
+            "entity_id": "8",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name":"target",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9108"
+        },
+        {
+            "entity_id": "9",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name":"target",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9109"
+        },
+
+        {
+            "entity_id": "10",
+            "hostname": "localhost",
+            "role": "producer_performance",
+            "cluster_name":"source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9110"
+        },
+        {
+            "entity_id": "11",
+            "hostname": "localhost",
+            "role": "console_consumer",
+            "cluster_name":"target",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9111"
+        },
+
+        {
+            "entity_id": "12",
+            "hostname": "localhost",
+            "role": "mirror_maker",
+            "cluster_name":"target",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9112"
+        },
+        {
+            "entity_id": "13",
+            "hostname": "localhost",
+            "role": "mirror_maker",
+            "cluster_name":"target",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9113"
+        }
+    ]
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/mirror_maker_testsuite/testcase_15003/testcase_5003_properties.json
----------------------------------------------------------------------
diff --git a/system_test/mirror_maker_testsuite/testcase_15003/testcase_5003_properties.json b/system_test/mirror_maker_testsuite/testcase_15003/testcase_5003_properties.json
new file mode 100644
index 0000000..98fefee
--- /dev/null
+++ b/system_test/mirror_maker_testsuite/testcase_15003/testcase_5003_properties.json
@@ -0,0 +1,156 @@
+{
+  "description": {"01":"Replication with Mirror Maker => Bounce Mirror Maker",
+                  "02":"Set up 2 clusters such as : SOURCE => MirrorMaker => TARGET",
+                  "03":"Set up 2-node Zk cluster for both SOURCE & TARGET",
+                  "04":"Produce and consume messages to a single topic - single partition.",
+                  "05":"This test sends messages to 3 replicas",
+                  "06":"At the end it verifies the log size and contents",
+                  "07":"Use a consumer to verify no message loss in TARGET cluster.",
+                  "08":"Producer dimensions : mode:async, acks:-1, comp:1",
+                  "09":"Log segment size    : 10240"
+  },
+  "testcase_args": {
+    "bounce_leader": "false",
+    "bounce_mirror_maker": "true",
+    "bounced_entity_downtime_sec": "30",
+    "replica_factor": "3",
+    "num_partition": "1",
+    "num_iteration": "1",
+    "sleep_seconds_between_producer_calls": "1",
+    "message_producing_free_time_sec": "15",
+    "num_messages_to_produce_per_producer_call": "50"
+  },
+  "entities": [
+    {
+      "entity_id": "0",
+      "clientPort": "2108",
+      "dataDir": "/tmp/zookeeper_0",
+      "log_filename": "zookeeper_0.log",
+      "config_filename": "zookeeper_0.properties"
+    },
+    {
+      "entity_id": "1",
+      "clientPort": "2118",
+      "dataDir": "/tmp/zookeeper_1",
+      "log_filename": "zookeeper_1.log",
+      "config_filename": "zookeeper_1.properties"
+    },
+
+    {
+      "entity_id": "2",
+      "clientPort": "2128",
+      "dataDir": "/tmp/zookeeper_2",
+      "log_filename": "zookeeper_2.log",
+      "config_filename": "zookeeper_2.properties"
+    },
+    {
+      "entity_id": "3",
+      "clientPort": "2138",
+      "dataDir": "/tmp/zookeeper_3",
+      "log_filename": "zookeeper_3.log",
+      "config_filename": "zookeeper_3.properties"
+    },
+
+    {
+      "entity_id": "4",
+      "port": "9091",
+      "broker.id": "1",
+      "log.segment.bytes": "10240",
+      "log.dir": "/tmp/kafka_server_4_logs",
+      "default.replication.factor": "3",
+      "num.partitions": "5",
+      "log_filename": "kafka_server_4.log",
+      "config_filename": "kafka_server_4.properties"
+    },
+    {
+      "entity_id": "5",
+      "port": "9092",
+      "broker.id": "2",
+      "log.segment.bytes": "10240",
+      "log.dir": "/tmp/kafka_server_5_logs",
+      "default.replication.factor": "3",
+      "num.partitions": "5",
+      "log_filename": "kafka_server_5.log",
+      "config_filename": "kafka_server_5.properties"
+    },
+    {
+      "entity_id": "6",
+      "port": "9093",
+      "broker.id": "3",
+      "log.segment.bytes": "10240",
+      "log.dir": "/tmp/kafka_server_6_logs",
+      "default.replication.factor": "3",
+      "num.partitions": "5",
+      "log_filename": "kafka_server_6.log",
+      "config_filename": "kafka_server_6.properties"
+    },
+    {
+      "entity_id": "7",
+      "port": "9094",
+      "broker.id": "4",
+      "log.segment.bytes": "10240",
+      "log.dir": "/tmp/kafka_server_7_logs",
+      "default.replication.factor": "3",
+      "num.partitions": "5",
+      "log_filename": "kafka_server_7.log",
+      "config_filename": "kafka_server_7.properties"
+    },
+    {
+      "entity_id": "8",
+      "port": "9095",
+      "broker.id": "5",
+      "log.segment.bytes": "10240",
+      "log.dir": "/tmp/kafka_server_8_logs",
+      "default.replication.factor": "3",
+      "num.partitions": "5",
+      "log_filename": "kafka_server_8.log",
+      "config_filename": "kafka_server_8.properties"
+    },
+    {
+      "entity_id": "9",
+      "port": "9096",
+      "broker.id": "6",
+      "log.segment.bytes": "10240",
+      "log.dir": "/tmp/kafka_server_9_logs",
+      "default.replication.factor": "3",
+      "num.partitions": "5",
+      "log_filename": "kafka_server_9.log",
+      "config_filename": "kafka_server_9.properties"
+    },
+
+    {
+      "entity_id": "10",
+      "topic": "test_1",
+      "threads": "5",
+      "compression-codec": "2",
+      "message-size": "500",
+      "message": "100",
+      "request-num-acks": "-1",
+      "sync":"false",
+      "producer-num-retries":"5",
+      "log_filename": "producer_performance_10.log",
+      "config_filename": "producer_performance_10.properties"
+    },
+    {
+      "entity_id": "11",
+      "topic": "test_1",
+      "group.id": "mytestgroup",
+      "consumer-timeout-ms": "10000",
+      "log_filename": "console_consumer_11.log",
+      "config_filename": "console_consumer_11.properties"
+    },
+
+    {
+      "entity_id": "12",
+      "log_filename": "mirror_maker_12.log",
+      "mirror_consumer_config_filename": "mirror_consumer_12.properties",
+      "mirror_producer_config_filename": "mirror_producer_12.properties"
+    },
+    {
+      "entity_id": "13",
+      "log_filename": "mirror_maker_13.log",
+      "mirror_consumer_config_filename": "mirror_consumer_13.properties",
+      "mirror_producer_config_filename": "mirror_producer_13.properties"
+    }
+   ]
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/mirror_maker_testsuite/testcase_15004/cluster_config.json
----------------------------------------------------------------------
diff --git a/system_test/mirror_maker_testsuite/testcase_15004/cluster_config.json b/system_test/mirror_maker_testsuite/testcase_15004/cluster_config.json
new file mode 100644
index 0000000..f6fe867
--- /dev/null
+++ b/system_test/mirror_maker_testsuite/testcase_15004/cluster_config.json
@@ -0,0 +1,135 @@
+{
+    "cluster_config": [
+        {
+            "entity_id": "0",
+            "hostname": "localhost",
+            "role": "zookeeper",
+            "cluster_name":"source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9100"
+        },
+        {
+            "entity_id": "1",
+            "hostname": "localhost",
+            "role": "zookeeper",
+            "cluster_name":"source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9101"
+        },
+
+        {
+            "entity_id": "2",
+            "hostname": "localhost",
+            "role": "zookeeper",
+            "cluster_name":"target",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9102"
+        },
+        {
+            "entity_id": "3",
+            "hostname": "localhost",
+            "role": "zookeeper",
+            "cluster_name":"target",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9103"
+        },
+
+        {
+            "entity_id": "4",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name":"source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9104"
+        },
+        {
+            "entity_id": "5",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name":"source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9105"
+        },
+        {
+            "entity_id": "6",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name":"source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9106"
+        },
+
+        {
+            "entity_id": "7",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name":"target",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9107"
+        },
+        {
+            "entity_id": "8",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name":"target",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9108"
+        },
+        {
+            "entity_id": "9",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name":"target",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9109"
+        },
+
+        {
+            "entity_id": "10",
+            "hostname": "localhost",
+            "role": "producer_performance",
+            "cluster_name":"source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9110"
+        },
+        {
+            "entity_id": "11",
+            "hostname": "localhost",
+            "role": "console_consumer",
+            "cluster_name":"target",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9111"
+        },
+
+        {
+            "entity_id": "12",
+            "hostname": "localhost",
+            "role": "mirror_maker",
+            "cluster_name":"target",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9112"
+        },
+        {
+            "entity_id": "13",
+            "hostname": "localhost",
+            "role": "mirror_maker",
+            "cluster_name":"target",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9113"
+        }
+    ]
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/mirror_maker_testsuite/testcase_15004/testcase_5004_properties.json
----------------------------------------------------------------------
diff --git a/system_test/mirror_maker_testsuite/testcase_15004/testcase_5004_properties.json b/system_test/mirror_maker_testsuite/testcase_15004/testcase_5004_properties.json
new file mode 100644
index 0000000..6067b12
--- /dev/null
+++ b/system_test/mirror_maker_testsuite/testcase_15004/testcase_5004_properties.json
@@ -0,0 +1,156 @@
+{
+  "description": {"01":"Replication with Mirror Maker => Bounce Mirror Maker",
+                  "02":"Set up 2 clusters such as : SOURCE => MirrorMaker => TARGET",
+                  "03":"Set up 2-node Zk cluster for both SOURCE & TARGET",
+                  "04":"Produce and consume messages to a single topic - single partition.",
+                  "05":"This test sends messages to 3 replicas",
+                  "06":"At the end it verifies the log size and contents",
+                  "07":"Use a consumer to verify no message loss in TARGET cluster.",
+                  "08":"Producer dimensions : mode:async, acks:1, comp:1",
+                  "09":"Log segment size    : 10240"
+  },
+  "testcase_args": {
+    "bounce_leader": "false",
+    "bounce_mirror_maker": "true",
+    "bounced_entity_downtime_sec": "30",
+    "replica_factor": "3",
+    "num_partition": "1",
+    "num_iteration": "1",
+    "sleep_seconds_between_producer_calls": "1",
+    "message_producing_free_time_sec": "15",
+    "num_messages_to_produce_per_producer_call": "50"
+  },
+  "entities": [
+    {
+      "entity_id": "0",
+      "clientPort": "2108",
+      "dataDir": "/tmp/zookeeper_0",
+      "log_filename": "zookeeper_0.log",
+      "config_filename": "zookeeper_0.properties"
+    },
+    {
+      "entity_id": "1",
+      "clientPort": "2118",
+      "dataDir": "/tmp/zookeeper_1",
+      "log_filename": "zookeeper_1.log",
+      "config_filename": "zookeeper_1.properties"
+    },
+
+    {
+      "entity_id": "2",
+      "clientPort": "2128",
+      "dataDir": "/tmp/zookeeper_2",
+      "log_filename": "zookeeper_2.log",
+      "config_filename": "zookeeper_2.properties"
+    },
+    {
+      "entity_id": "3",
+      "clientPort": "2138",
+      "dataDir": "/tmp/zookeeper_3",
+      "log_filename": "zookeeper_3.log",
+      "config_filename": "zookeeper_3.properties"
+    },
+
+    {
+      "entity_id": "4",
+      "port": "9091",
+      "broker.id": "1",
+      "log.segment.bytes": "10240",
+      "log.dir": "/tmp/kafka_server_4_logs",
+      "default.replication.factor": "3",
+      "num.partitions": "5",
+      "log_filename": "kafka_server_4.log",
+      "config_filename": "kafka_server_4.properties"
+    },
+    {
+      "entity_id": "5",
+      "port": "9092",
+      "broker.id": "2",
+      "log.segment.bytes": "10240",
+      "log.dir": "/tmp/kafka_server_5_logs",
+      "default.replication.factor": "3",
+      "num.partitions": "5",
+      "log_filename": "kafka_server_5.log",
+      "config_filename": "kafka_server_5.properties"
+    },
+    {
+      "entity_id": "6",
+      "port": "9093",
+      "broker.id": "3",
+      "log.segment.bytes": "10240",
+      "log.dir": "/tmp/kafka_server_6_logs",
+      "default.replication.factor": "3",
+      "num.partitions": "5",
+      "log_filename": "kafka_server_6.log",
+      "config_filename": "kafka_server_6.properties"
+    },
+    {
+      "entity_id": "7",
+      "port": "9094",
+      "broker.id": "4",
+      "log.segment.bytes": "10240",
+      "log.dir": "/tmp/kafka_server_7_logs",
+      "default.replication.factor": "3",
+      "num.partitions": "5",
+      "log_filename": "kafka_server_7.log",
+      "config_filename": "kafka_server_7.properties"
+    },
+    {
+      "entity_id": "8",
+      "port": "9095",
+      "broker.id": "5",
+      "log.segment.bytes": "10240",
+      "log.dir": "/tmp/kafka_server_8_logs",
+      "default.replication.factor": "3",
+      "num.partitions": "5",
+      "log_filename": "kafka_server_8.log",
+      "config_filename": "kafka_server_8.properties"
+    },
+    {
+      "entity_id": "9",
+      "port": "9096",
+      "broker.id": "6",
+      "log.segment.bytes": "10240",
+      "log.dir": "/tmp/kafka_server_9_logs",
+      "default.replication.factor": "3",
+      "num.partitions": "5",
+      "log_filename": "kafka_server_9.log",
+      "config_filename": "kafka_server_9.properties"
+    },
+
+    {
+      "entity_id": "10",
+      "topic": "test_1",
+      "threads": "5",
+      "compression-codec": "1",
+      "message-size": "500",
+      "message": "100",
+      "request-num-acks": "1",
+      "sync":"false",
+      "producer-num-retries":"5",
+      "log_filename": "producer_performance_10.log",
+      "config_filename": "producer_performance_10.properties"
+    },
+    {
+      "entity_id": "11",
+      "topic": "test_1",
+      "group.id": "mytestgroup",
+      "consumer-timeout-ms": "10000",
+      "log_filename": "console_consumer_11.log",
+      "config_filename": "console_consumer_11.properties"
+    },
+
+    {
+      "entity_id": "12",
+      "log_filename": "mirror_maker_12.log",
+      "mirror_consumer_config_filename": "mirror_consumer_12.properties",
+      "mirror_producer_config_filename": "mirror_producer_12.properties"
+    },
+    {
+      "entity_id": "13",
+      "log_filename": "mirror_maker_13.log",
+      "mirror_consumer_config_filename": "mirror_consumer_13.properties",
+      "mirror_producer_config_filename": "mirror_producer_13.properties"
+    }
+   ]
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/mirror_maker_testsuite/testcase_15005/cluster_config.json
----------------------------------------------------------------------
diff --git a/system_test/mirror_maker_testsuite/testcase_15005/cluster_config.json b/system_test/mirror_maker_testsuite/testcase_15005/cluster_config.json
new file mode 100644
index 0000000..63ba37b
--- /dev/null
+++ b/system_test/mirror_maker_testsuite/testcase_15005/cluster_config.json
@@ -0,0 +1,153 @@
+{
+    "cluster_config": [
+        {
+            "entity_id": "0",
+            "hostname": "localhost",
+            "role": "zookeeper",
+            "cluster_name":"source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9100"
+        },
+        {
+            "entity_id": "1",
+            "hostname": "localhost",
+            "role": "zookeeper",
+            "cluster_name":"source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9101"
+        },
+
+        {
+            "entity_id": "2",
+            "hostname": "localhost",
+            "role": "zookeeper",
+            "cluster_name":"target",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9102"
+        },
+        {
+            "entity_id": "3",
+            "hostname": "localhost",
+            "role": "zookeeper",
+            "cluster_name":"target",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9103"
+        },
+
+        {
+            "entity_id": "4",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name":"source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9104"
+        },
+        {
+            "entity_id": "5",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name":"source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9105"
+        },
+        {
+            "entity_id": "6",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name":"source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9106"
+        },
+
+        {
+            "entity_id": "7",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name":"target",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9107"
+        },
+        {
+            "entity_id": "8",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name":"target",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9108"
+        },
+        {
+            "entity_id": "9",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name":"target",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9109"
+        },
+
+        {
+            "entity_id": "10",
+            "hostname": "localhost",
+            "role": "producer_performance",
+            "cluster_name":"source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9110"
+        },
+        {
+            "entity_id": "11",
+            "hostname": "localhost",
+            "role": "producer_performance",
+            "cluster_name":"source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9111"
+        },
+        {
+            "entity_id": "12",
+            "hostname": "localhost",
+            "role": "console_consumer",
+            "cluster_name":"target",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9112"
+        },
+        {
+            "entity_id": "13",
+            "hostname": "localhost",
+            "role": "console_consumer",
+            "cluster_name":"target",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9113"
+        },
+
+        {
+            "entity_id": "14",
+            "hostname": "localhost",
+            "role": "mirror_maker",
+            "cluster_name":"target",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9114"
+        },
+        {
+            "entity_id": "15",
+            "hostname": "localhost",
+            "role": "mirror_maker",
+            "cluster_name":"target",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9115"
+        }
+    ]
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/mirror_maker_testsuite/testcase_15005/testcase_5005_properties.json
----------------------------------------------------------------------
diff --git a/system_test/mirror_maker_testsuite/testcase_15005/testcase_5005_properties.json b/system_test/mirror_maker_testsuite/testcase_15005/testcase_5005_properties.json
new file mode 100644
index 0000000..58483ad
--- /dev/null
+++ b/system_test/mirror_maker_testsuite/testcase_15005/testcase_5005_properties.json
@@ -0,0 +1,178 @@
+{
+  "description": {"01":"Replication with Mirror Maker => Bounce Mirror Maker",
+                  "02":"Set up 2 clusters such as : SOURCE => MirrorMaker => TARGET",
+                  "03":"Set up 2-node Zk cluster for both SOURCE & TARGET",
+                  "04":"Produce and consume messages to 2 topics - 2 partitions.",
+                  "05":"This test sends messages to 3 replicas",
+                  "06":"At the end it verifies the log size and contents",
+                  "07":"Use a consumer to verify no message loss in TARGET cluster.",
+                  "08":"Producer dimensions : mode:async, acks:-1, comp:1",
+                  "09":"Log segment size    : 10240"
+  },
+  "testcase_args": {
+    "bounce_leader": "false",
+    "bounce_mirror_maker": "true",
+    "bounced_entity_downtime_sec": "30",
+    "replica_factor": "3",
+    "num_partition": "2",
+    "num_iteration": "1",
+    "sleep_seconds_between_producer_calls": "1",
+    "message_producing_free_time_sec": "15",
+    "num_messages_to_produce_per_producer_call": "50"
+  },
+  "entities": [
+    {
+      "entity_id": "0",
+      "clientPort": "2108",
+      "dataDir": "/tmp/zookeeper_0",
+      "log_filename": "zookeeper_0.log",
+      "config_filename": "zookeeper_0.properties"
+    },
+    {
+      "entity_id": "1",
+      "clientPort": "2118",
+      "dataDir": "/tmp/zookeeper_1",
+      "log_filename": "zookeeper_1.log",
+      "config_filename": "zookeeper_1.properties"
+    },
+
+    {
+      "entity_id": "2",
+      "clientPort": "2128",
+      "dataDir": "/tmp/zookeeper_2",
+      "log_filename": "zookeeper_2.log",
+      "config_filename": "zookeeper_2.properties"
+    },
+    {
+      "entity_id": "3",
+      "clientPort": "2138",
+      "dataDir": "/tmp/zookeeper_3",
+      "log_filename": "zookeeper_3.log",
+      "config_filename": "zookeeper_3.properties"
+    },
+
+    {
+      "entity_id": "4",
+      "port": "9091",
+      "broker.id": "1",
+      "log.segment.bytes": "10240",
+      "log.dir": "/tmp/kafka_server_4_logs",
+      "default.replication.factor": "3",
+      "num.partitions": "5",
+      "log_filename": "kafka_server_4.log",
+      "config_filename": "kafka_server_4.properties"
+    },
+    {
+      "entity_id": "5",
+      "port": "9092",
+      "broker.id": "2",
+      "log.segment.bytes": "10240",
+      "log.dir": "/tmp/kafka_server_5_logs",
+      "default.replication.factor": "3",
+      "num.partitions": "5",
+      "log_filename": "kafka_server_5.log",
+      "config_filename": "kafka_server_5.properties"
+    },
+    {
+      "entity_id": "6",
+      "port": "9093",
+      "broker.id": "3",
+      "log.segment.bytes": "10240",
+      "log.dir": "/tmp/kafka_server_6_logs",
+      "default.replication.factor": "3",
+      "num.partitions": "5",
+      "log_filename": "kafka_server_6.log",
+      "config_filename": "kafka_server_6.properties"
+    },
+    {
+      "entity_id": "7",
+      "port": "9094",
+      "broker.id": "4",
+      "log.segment.bytes": "10240",
+      "log.dir": "/tmp/kafka_server_7_logs",
+      "default.replication.factor": "3",
+      "num.partitions": "5",
+      "log_filename": "kafka_server_7.log",
+      "config_filename": "kafka_server_7.properties"
+    },
+    {
+      "entity_id": "8",
+      "port": "9095",
+      "broker.id": "5",
+      "log.segment.bytes": "10240",
+      "log.dir": "/tmp/kafka_server_8_logs",
+      "default.replication.factor": "3",
+      "num.partitions": "5",
+      "log_filename": "kafka_server_8.log",
+      "config_filename": "kafka_server_8.properties"
+    },
+    {
+      "entity_id": "9",
+      "port": "9096",
+      "broker.id": "6",
+      "log.segment.bytes": "10240",
+      "log.dir": "/tmp/kafka_server_9_logs",
+      "default.replication.factor": "3",
+      "num.partitions": "5",
+      "log_filename": "kafka_server_9.log",
+      "config_filename": "kafka_server_9.properties"
+    },
+
+    {
+      "entity_id": "10",
+      "topic": "test_1",
+      "threads": "5",
+      "compression-codec": "1",
+      "message-size": "500",
+      "message": "100",
+      "request-num-acks": "-1",
+      "sync":"false",
+      "producer-num-retries":"5",
+      "log_filename": "producer_performance_10.log",
+      "config_filename": "producer_performance_10.properties"
+    },
+    {
+      "entity_id": "11",
+      "topic": "test_2",
+      "threads": "5",
+      "compression-codec": "1",
+      "message-size": "500",
+      "message": "100",
+      "request-num-acks": "-1",
+      "sync":"false",
+      "producer-num-retries":"5",
+      "log_filename": "producer_performance_11.log",
+      "config_filename": "producer_performance_11.properties"
+    },
+
+    {
+      "entity_id": "12",
+      "topic": "test_1",
+      "group.id": "mytestgroup",
+      "consumer-timeout-ms": "10000",
+      "log_filename": "console_consumer_12.log",
+      "config_filename": "console_consumer_12.properties"
+    },
+    {
+      "entity_id": "13",
+      "topic": "test_2",
+      "group.id": "mytestgroup",
+      "consumer-timeout-ms": "10000",
+      "log_filename": "console_consumer_13.log",
+      "config_filename": "console_consumer_13.properties"
+    },
+
+    {
+      "entity_id": "14",
+      "log_filename": "mirror_maker_14.log",
+      "mirror_consumer_config_filename": "mirror_consumer_14.properties",
+      "mirror_producer_config_filename": "mirror_producer_14.properties"
+    },
+    {
+      "entity_id": "15",
+      "log_filename": "mirror_maker_15.log",
+      "mirror_consumer_config_filename": "mirror_consumer_15.properties",
+      "mirror_producer_config_filename": "mirror_producer_15.properties"
+    }
+   ]
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/mirror_maker_testsuite/testcase_15006/cluster_config.json
----------------------------------------------------------------------
diff --git a/system_test/mirror_maker_testsuite/testcase_15006/cluster_config.json b/system_test/mirror_maker_testsuite/testcase_15006/cluster_config.json
new file mode 100644
index 0000000..63ba37b
--- /dev/null
+++ b/system_test/mirror_maker_testsuite/testcase_15006/cluster_config.json
@@ -0,0 +1,153 @@
+{
+    "cluster_config": [
+        {
+            "entity_id": "0",
+            "hostname": "localhost",
+            "role": "zookeeper",
+            "cluster_name":"source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9100"
+        },
+        {
+            "entity_id": "1",
+            "hostname": "localhost",
+            "role": "zookeeper",
+            "cluster_name":"source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9101"
+        },
+
+        {
+            "entity_id": "2",
+            "hostname": "localhost",
+            "role": "zookeeper",
+            "cluster_name":"target",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9102"
+        },
+        {
+            "entity_id": "3",
+            "hostname": "localhost",
+            "role": "zookeeper",
+            "cluster_name":"target",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9103"
+        },
+
+        {
+            "entity_id": "4",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name":"source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9104"
+        },
+        {
+            "entity_id": "5",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name":"source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9105"
+        },
+        {
+            "entity_id": "6",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name":"source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9106"
+        },
+
+        {
+            "entity_id": "7",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name":"target",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9107"
+        },
+        {
+            "entity_id": "8",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name":"target",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9108"
+        },
+        {
+            "entity_id": "9",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name":"target",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9109"
+        },
+
+        {
+            "entity_id": "10",
+            "hostname": "localhost",
+            "role": "producer_performance",
+            "cluster_name":"source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9110"
+        },
+        {
+            "entity_id": "11",
+            "hostname": "localhost",
+            "role": "producer_performance",
+            "cluster_name":"source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9111"
+        },
+        {
+            "entity_id": "12",
+            "hostname": "localhost",
+            "role": "console_consumer",
+            "cluster_name":"target",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9112"
+        },
+        {
+            "entity_id": "13",
+            "hostname": "localhost",
+            "role": "console_consumer",
+            "cluster_name":"target",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9113"
+        },
+
+        {
+            "entity_id": "14",
+            "hostname": "localhost",
+            "role": "mirror_maker",
+            "cluster_name":"target",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9114"
+        },
+        {
+            "entity_id": "15",
+            "hostname": "localhost",
+            "role": "mirror_maker",
+            "cluster_name":"target",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9115"
+        }
+    ]
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/mirror_maker_testsuite/testcase_15006/testcase_5006_properties.json
----------------------------------------------------------------------
diff --git a/system_test/mirror_maker_testsuite/testcase_15006/testcase_5006_properties.json b/system_test/mirror_maker_testsuite/testcase_15006/testcase_5006_properties.json
new file mode 100644
index 0000000..1d9190c
--- /dev/null
+++ b/system_test/mirror_maker_testsuite/testcase_15006/testcase_5006_properties.json
@@ -0,0 +1,178 @@
+{
+  "description": {"01":"Replication with Mirror Maker => Bounce Mirror Maker",
+                  "02":"Set up 2 clusters such as : SOURCE => MirrorMaker => TARGET",
+                  "03":"Set up 2-node Zk cluster for both SOURCE & TARGET",
+                  "04":"Produce and consume messages to 2 topics - 2 partitions.",
+                  "05":"This test sends messages to 3 replicas",
+                  "06":"At the end it verifies the log size and contents",
+                  "07":"Use a consumer to verify no message loss in TARGET cluster.",
+                  "08":"Producer dimensions : mode:async, acks:1, comp:1",
+                  "09":"Log segment size    : 10240"
+  },
+  "testcase_args": {
+    "bounce_leader": "false",
+    "bounce_mirror_maker": "true",
+    "bounced_entity_downtime_sec": "30",
+    "replica_factor": "3",
+    "num_partition": "2",
+    "num_iteration": "1",
+    "sleep_seconds_between_producer_calls": "1",
+    "message_producing_free_time_sec": "15",
+    "num_messages_to_produce_per_producer_call": "50"
+  },
+  "entities": [
+    {
+      "entity_id": "0",
+      "clientPort": "2108",
+      "dataDir": "/tmp/zookeeper_0",
+      "log_filename": "zookeeper_0.log",
+      "config_filename": "zookeeper_0.properties"
+    },
+    {
+      "entity_id": "1",
+      "clientPort": "2118",
+      "dataDir": "/tmp/zookeeper_1",
+      "log_filename": "zookeeper_1.log",
+      "config_filename": "zookeeper_1.properties"
+    },
+
+    {
+      "entity_id": "2",
+      "clientPort": "2128",
+      "dataDir": "/tmp/zookeeper_2",
+      "log_filename": "zookeeper_2.log",
+      "config_filename": "zookeeper_2.properties"
+    },
+    {
+      "entity_id": "3",
+      "clientPort": "2138",
+      "dataDir": "/tmp/zookeeper_3",
+      "log_filename": "zookeeper_3.log",
+      "config_filename": "zookeeper_3.properties"
+    },
+
+    {
+      "entity_id": "4",
+      "port": "9091",
+      "broker.id": "1",
+      "log.segment.bytes": "10240",
+      "log.dir": "/tmp/kafka_server_4_logs",
+      "default.replication.factor": "3",
+      "num.partitions": "5",
+      "log_filename": "kafka_server_4.log",
+      "config_filename": "kafka_server_4.properties"
+    },
+    {
+      "entity_id": "5",
+      "port": "9092",
+      "broker.id": "2",
+      "log.segment.bytes": "10240",
+      "log.dir": "/tmp/kafka_server_5_logs",
+      "default.replication.factor": "3",
+      "num.partitions": "5",
+      "log_filename": "kafka_server_5.log",
+      "config_filename": "kafka_server_5.properties"
+    },
+    {
+      "entity_id": "6",
+      "port": "9093",
+      "broker.id": "3",
+      "log.segment.bytes": "10240",
+      "log.dir": "/tmp/kafka_server_6_logs",
+      "default.replication.factor": "3",
+      "num.partitions": "5",
+      "log_filename": "kafka_server_6.log",
+      "config_filename": "kafka_server_6.properties"
+    },
+    {
+      "entity_id": "7",
+      "port": "9094",
+      "broker.id": "4",
+      "log.segment.bytes": "10240",
+      "log.dir": "/tmp/kafka_server_7_logs",
+      "default.replication.factor": "3",
+      "num.partitions": "5",
+      "log_filename": "kafka_server_7.log",
+      "config_filename": "kafka_server_7.properties"
+    },
+    {
+      "entity_id": "8",
+      "port": "9095",
+      "broker.id": "5",
+      "log.segment.bytes": "10240",
+      "log.dir": "/tmp/kafka_server_8_logs",
+      "default.replication.factor": "3",
+      "num.partitions": "5",
+      "log_filename": "kafka_server_8.log",
+      "config_filename": "kafka_server_8.properties"
+    },
+    {
+      "entity_id": "9",
+      "port": "9096",
+      "broker.id": "6",
+      "log.segment.bytes": "10240",
+      "log.dir": "/tmp/kafka_server_9_logs",
+      "default.replication.factor": "3",
+      "num.partitions": "5",
+      "log_filename": "kafka_server_9.log",
+      "config_filename": "kafka_server_9.properties"
+    },
+
+    {
+      "entity_id": "10",
+      "topic": "test_1",
+      "threads": "5",
+      "compression-codec": "1",
+      "message-size": "500",
+      "message": "100",
+      "request-num-acks": "1",
+      "sync":"false",
+      "producer-num-retries":"5",
+      "log_filename": "producer_performance_10.log",
+      "config_filename": "producer_performance_10.properties"
+    },
+    {
+      "entity_id": "11",
+      "topic": "test_2",
+      "threads": "5",
+      "compression-codec": "1",
+      "message-size": "500",
+      "message": "100",
+      "request-num-acks": "1",
+      "sync":"false",
+      "producer-num-retries":"5",
+      "log_filename": "producer_performance_11.log",
+      "config_filename": "producer_performance_11.properties"
+    },
+
+    {
+      "entity_id": "12",
+      "topic": "test_1",
+      "group.id": "mytestgroup",
+      "consumer-timeout-ms": "10000",
+      "log_filename": "console_consumer_12.log",
+      "config_filename": "console_consumer_12.properties"
+    },
+    {
+      "entity_id": "13",
+      "topic": "test_2",
+      "group.id": "mytestgroup",
+      "consumer-timeout-ms": "10000",
+      "log_filename": "console_consumer_13.log",
+      "config_filename": "console_consumer_13.properties"
+    },
+
+    {
+      "entity_id": "14",
+      "log_filename": "mirror_maker_14.log",
+      "mirror_consumer_config_filename": "mirror_consumer_14.properties",
+      "mirror_producer_config_filename": "mirror_producer_14.properties"
+    },
+    {
+      "entity_id": "15",
+      "log_filename": "mirror_maker_15.log",
+      "mirror_consumer_config_filename": "mirror_consumer_15.properties",
+      "mirror_producer_config_filename": "mirror_producer_15.properties"
+    }
+   ]
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/mirror_maker_testsuite/testcase_5001/testcase_5001_properties.json
----------------------------------------------------------------------
diff --git a/system_test/mirror_maker_testsuite/testcase_5001/testcase_5001_properties.json b/system_test/mirror_maker_testsuite/testcase_5001/testcase_5001_properties.json
index 287cab9..4a0da6e 100644
--- a/system_test/mirror_maker_testsuite/testcase_5001/testcase_5001_properties.json
+++ b/system_test/mirror_maker_testsuite/testcase_5001/testcase_5001_properties.json
@@ -119,6 +119,7 @@
 
     {
       "entity_id": "10",
+      "new-producer":"true",
       "topic": "test_1",
       "threads": "5",
       "compression-codec": "0",
@@ -141,6 +142,7 @@
 
     {
       "entity_id": "12",
+      "new-producer":"true",
       "log_filename": "mirror_maker_12.log",
       "mirror_consumer_config_filename": "mirror_consumer_12.properties",
       "mirror_producer_config_filename": "mirror_producer_12.properties"

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/mirror_maker_testsuite/testcase_5002/testcase_5002_properties.json
----------------------------------------------------------------------
diff --git a/system_test/mirror_maker_testsuite/testcase_5002/testcase_5002_properties.json b/system_test/mirror_maker_testsuite/testcase_5002/testcase_5002_properties.json
index 5457eb1..d74e97d 100644
--- a/system_test/mirror_maker_testsuite/testcase_5002/testcase_5002_properties.json
+++ b/system_test/mirror_maker_testsuite/testcase_5002/testcase_5002_properties.json
@@ -119,6 +119,7 @@
 
     {
       "entity_id": "10",
+      "new-producer":"true",
       "topic": "test_1",
       "threads": "5",
       "compression-codec": "0",
@@ -141,6 +142,7 @@
 
     {
       "entity_id": "12",
+      "new-producer":"true",
       "log_filename": "mirror_maker_12.log",
       "mirror_consumer_config_filename": "mirror_consumer_12.properties",
       "mirror_producer_config_filename": "mirror_producer_12.properties"

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/mirror_maker_testsuite/testcase_5003/testcase_5003_properties.json
----------------------------------------------------------------------
diff --git a/system_test/mirror_maker_testsuite/testcase_5003/testcase_5003_properties.json b/system_test/mirror_maker_testsuite/testcase_5003/testcase_5003_properties.json
index 98fefee..e33acf1 100644
--- a/system_test/mirror_maker_testsuite/testcase_5003/testcase_5003_properties.json
+++ b/system_test/mirror_maker_testsuite/testcase_5003/testcase_5003_properties.json
@@ -120,6 +120,7 @@
 
     {
       "entity_id": "10",
+      "new-producer":"true",
       "topic": "test_1",
       "threads": "5",
       "compression-codec": "2",
@@ -142,12 +143,14 @@
 
     {
       "entity_id": "12",
+      "new-producer":"true",
       "log_filename": "mirror_maker_12.log",
       "mirror_consumer_config_filename": "mirror_consumer_12.properties",
       "mirror_producer_config_filename": "mirror_producer_12.properties"
     },
     {
       "entity_id": "13",
+      "new-producer":"true",
       "log_filename": "mirror_maker_13.log",
       "mirror_consumer_config_filename": "mirror_consumer_13.properties",
       "mirror_producer_config_filename": "mirror_producer_13.properties"

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/mirror_maker_testsuite/testcase_5004/testcase_5004_properties.json
----------------------------------------------------------------------
diff --git a/system_test/mirror_maker_testsuite/testcase_5004/testcase_5004_properties.json b/system_test/mirror_maker_testsuite/testcase_5004/testcase_5004_properties.json
index 6067b12..5c39bcf 100644
--- a/system_test/mirror_maker_testsuite/testcase_5004/testcase_5004_properties.json
+++ b/system_test/mirror_maker_testsuite/testcase_5004/testcase_5004_properties.json
@@ -120,6 +120,7 @@
 
     {
       "entity_id": "10",
+      "new-producer":"true",
       "topic": "test_1",
       "threads": "5",
       "compression-codec": "1",
@@ -142,12 +143,14 @@
 
     {
       "entity_id": "12",
+      "new-producer":"true",
       "log_filename": "mirror_maker_12.log",
       "mirror_consumer_config_filename": "mirror_consumer_12.properties",
       "mirror_producer_config_filename": "mirror_producer_12.properties"
     },
     {
       "entity_id": "13",
+      "new-producer":"true",
       "log_filename": "mirror_maker_13.log",
       "mirror_consumer_config_filename": "mirror_consumer_13.properties",
       "mirror_producer_config_filename": "mirror_producer_13.properties"

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/mirror_maker_testsuite/testcase_5005/testcase_5005_properties.json
----------------------------------------------------------------------
diff --git a/system_test/mirror_maker_testsuite/testcase_5005/testcase_5005_properties.json b/system_test/mirror_maker_testsuite/testcase_5005/testcase_5005_properties.json
index 58483ad..697af65 100644
--- a/system_test/mirror_maker_testsuite/testcase_5005/testcase_5005_properties.json
+++ b/system_test/mirror_maker_testsuite/testcase_5005/testcase_5005_properties.json
@@ -120,6 +120,7 @@
 
     {
       "entity_id": "10",
+      "new-producer":"true",
       "topic": "test_1",
       "threads": "5",
       "compression-codec": "1",
@@ -133,6 +134,7 @@
     },
     {
       "entity_id": "11",
+      "new-producer":"true",
       "topic": "test_2",
       "threads": "5",
       "compression-codec": "1",
@@ -164,12 +166,14 @@
 
     {
       "entity_id": "14",
+      "new-producer":"true",
       "log_filename": "mirror_maker_14.log",
       "mirror_consumer_config_filename": "mirror_consumer_14.properties",
       "mirror_producer_config_filename": "mirror_producer_14.properties"
     },
     {
       "entity_id": "15",
+      "new-producer":"true",
       "log_filename": "mirror_maker_15.log",
       "mirror_consumer_config_filename": "mirror_consumer_15.properties",
       "mirror_producer_config_filename": "mirror_producer_15.properties"

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/mirror_maker_testsuite/testcase_5006/testcase_5006_properties.json
----------------------------------------------------------------------
diff --git a/system_test/mirror_maker_testsuite/testcase_5006/testcase_5006_properties.json b/system_test/mirror_maker_testsuite/testcase_5006/testcase_5006_properties.json
index 1d9190c..a610a60 100644
--- a/system_test/mirror_maker_testsuite/testcase_5006/testcase_5006_properties.json
+++ b/system_test/mirror_maker_testsuite/testcase_5006/testcase_5006_properties.json
@@ -120,6 +120,7 @@
 
     {
       "entity_id": "10",
+      "new-producer":"true",
       "topic": "test_1",
       "threads": "5",
       "compression-codec": "1",
@@ -133,6 +134,7 @@
     },
     {
       "entity_id": "11",
+      "new-producer":"true",
       "topic": "test_2",
       "threads": "5",
       "compression-codec": "1",
@@ -164,12 +166,14 @@
 
     {
       "entity_id": "14",
+      "new-producer":"true",
       "log_filename": "mirror_maker_14.log",
       "mirror_consumer_config_filename": "mirror_consumer_14.properties",
       "mirror_producer_config_filename": "mirror_producer_14.properties"
     },
     {
       "entity_id": "15",
+      "new-producer":"true",
       "log_filename": "mirror_maker_15.log",
       "mirror_consumer_config_filename": "mirror_consumer_15.properties",
       "mirror_producer_config_filename": "mirror_producer_15.properties"

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/replication_testsuite/replica_basic_test.py
----------------------------------------------------------------------
diff --git a/system_test/replication_testsuite/replica_basic_test.py b/system_test/replication_testsuite/replica_basic_test.py
index e20130b..5d3d93e 100644
--- a/system_test/replication_testsuite/replica_basic_test.py
+++ b/system_test/replication_testsuite/replica_basic_test.py
@@ -363,6 +363,7 @@ class ReplicaBasicTest(ReplicationUtils, SetupUtils):
                         str(self.testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"]) + "]", extra=self.d)
                     if self.testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"]:
                         time.sleep(1)
+                        self.testcaseEnv.lock.release()
                         self.logger.info("all producer threads completed", extra=self.d)
                         break
                     time.sleep(1)

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/replication_testsuite/testcase_0001/testcase_0001_properties.json
----------------------------------------------------------------------
diff --git a/system_test/replication_testsuite/testcase_0001/testcase_0001_properties.json b/system_test/replication_testsuite/testcase_0001/testcase_0001_properties.json
index 9bc164b..eaaa4ed 100644
--- a/system_test/replication_testsuite/testcase_0001/testcase_0001_properties.json
+++ b/system_test/replication_testsuite/testcase_0001/testcase_0001_properties.json
@@ -60,6 +60,7 @@
     },
     {
       "entity_id": "4",
+      "new-producer":"true",
       "topic": "test_1",
       "threads": "5",
       "compression-codec": "0",

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/replication_testsuite/testcase_0002/testcase_0002_properties.json
----------------------------------------------------------------------
diff --git a/system_test/replication_testsuite/testcase_0002/testcase_0002_properties.json b/system_test/replication_testsuite/testcase_0002/testcase_0002_properties.json
index c90d753..0ffbf67 100644
--- a/system_test/replication_testsuite/testcase_0002/testcase_0002_properties.json
+++ b/system_test/replication_testsuite/testcase_0002/testcase_0002_properties.json
@@ -60,6 +60,7 @@
     },
     {
       "entity_id": "4",
+      "new-producer":"true",
       "topic": "test_1",
       "threads": "5",
       "compression-codec": "1",

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/replication_testsuite/testcase_0003/testcase_0003_properties.json
----------------------------------------------------------------------
diff --git a/system_test/replication_testsuite/testcase_0003/testcase_0003_properties.json b/system_test/replication_testsuite/testcase_0003/testcase_0003_properties.json
index b62b8aa..e2fb579 100644
--- a/system_test/replication_testsuite/testcase_0003/testcase_0003_properties.json
+++ b/system_test/replication_testsuite/testcase_0003/testcase_0003_properties.json
@@ -60,6 +60,7 @@
     },
     {
       "entity_id": "4",
+      "new-producer":"true",
       "topic": "test_1",
       "threads": "5",
       "compression-codec": "1",

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/replication_testsuite/testcase_0004/testcase_0004_properties.json
----------------------------------------------------------------------
diff --git a/system_test/replication_testsuite/testcase_0004/testcase_0004_properties.json b/system_test/replication_testsuite/testcase_0004/testcase_0004_properties.json
index b91cef8..62fbe08 100644
--- a/system_test/replication_testsuite/testcase_0004/testcase_0004_properties.json
+++ b/system_test/replication_testsuite/testcase_0004/testcase_0004_properties.json
@@ -60,6 +60,7 @@
     },
     {
       "entity_id": "4",
+      "new-producer":"true",
       "topic": "test_1",
       "threads": "5",
       "compression-codec": "1",

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/replication_testsuite/testcase_0005/testcase_0005_properties.json
----------------------------------------------------------------------
diff --git a/system_test/replication_testsuite/testcase_0005/testcase_0005_properties.json b/system_test/replication_testsuite/testcase_0005/testcase_0005_properties.json
index 4b3f76a..02ad59d 100644
--- a/system_test/replication_testsuite/testcase_0005/testcase_0005_properties.json
+++ b/system_test/replication_testsuite/testcase_0005/testcase_0005_properties.json
@@ -60,6 +60,7 @@
     },
     {
       "entity_id": "4",
+      "new-producer":"true",
       "topic": "test_1",
       "threads": "5",
       "compression-codec": "1",

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/replication_testsuite/testcase_0006/testcase_0006_properties.json
----------------------------------------------------------------------
diff --git a/system_test/replication_testsuite/testcase_0006/testcase_0006_properties.json b/system_test/replication_testsuite/testcase_0006/testcase_0006_properties.json
index b9b3485..b64304f 100644
--- a/system_test/replication_testsuite/testcase_0006/testcase_0006_properties.json
+++ b/system_test/replication_testsuite/testcase_0006/testcase_0006_properties.json
@@ -60,6 +60,7 @@
     },
     {
       "entity_id": "4",
+      "new-producer":"true",
       "topic": "test_1",
       "threads": "5",
       "compression-codec": "1",

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/replication_testsuite/testcase_0007/testcase_0007_properties.json
----------------------------------------------------------------------
diff --git a/system_test/replication_testsuite/testcase_0007/testcase_0007_properties.json b/system_test/replication_testsuite/testcase_0007/testcase_0007_properties.json
index 5c4351f..e850709 100644
--- a/system_test/replication_testsuite/testcase_0007/testcase_0007_properties.json
+++ b/system_test/replication_testsuite/testcase_0007/testcase_0007_properties.json
@@ -60,6 +60,7 @@
     },
     {
       "entity_id": "4",
+      "new-producer":"true",
       "topic": "test_1",
       "threads": "5",
       "compression-codec": "1",

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/replication_testsuite/testcase_0008/testcase_0008_properties.json
----------------------------------------------------------------------
diff --git a/system_test/replication_testsuite/testcase_0008/testcase_0008_properties.json b/system_test/replication_testsuite/testcase_0008/testcase_0008_properties.json
index 79cfed8..47217cf 100644
--- a/system_test/replication_testsuite/testcase_0008/testcase_0008_properties.json
+++ b/system_test/replication_testsuite/testcase_0008/testcase_0008_properties.json
@@ -60,6 +60,7 @@
     },
     {
       "entity_id": "4",
+      "new-producer":"true",
       "topic": "test_1",
       "threads": "5",
       "compression-codec": "1",

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/replication_testsuite/testcase_0009/testcase_0009_properties.json
----------------------------------------------------------------------
diff --git a/system_test/replication_testsuite/testcase_0009/testcase_0009_properties.json b/system_test/replication_testsuite/testcase_0009/testcase_0009_properties.json
index a52b709..3ddaad4 100644
--- a/system_test/replication_testsuite/testcase_0009/testcase_0009_properties.json
+++ b/system_test/replication_testsuite/testcase_0009/testcase_0009_properties.json
@@ -60,6 +60,7 @@
     },
     {
       "entity_id": "4",
+      "new-producer":"true",
       "topic": "test_1",
       "threads": "5",
       "compression-codec": "1",

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/replication_testsuite/testcase_0010/testcase_0010_properties.json
----------------------------------------------------------------------
diff --git a/system_test/replication_testsuite/testcase_0010/testcase_0010_properties.json b/system_test/replication_testsuite/testcase_0010/testcase_0010_properties.json
index 8d4b5fe..e25ddb9 100644
--- a/system_test/replication_testsuite/testcase_0010/testcase_0010_properties.json
+++ b/system_test/replication_testsuite/testcase_0010/testcase_0010_properties.json
@@ -60,6 +60,7 @@
     },
     {
       "entity_id": "4",
+      "new-producer":"true",
       "topic": "test_1",
       "threads": "5",
       "compression-codec": "1",

http://git-wip-us.apache.org/repos/asf/kafka/blob/c765d7bd/system_test/replication_testsuite/testcase_0011/testcase_0011_properties.json
----------------------------------------------------------------------
diff --git a/system_test/replication_testsuite/testcase_0011/testcase_0011_properties.json b/system_test/replication_testsuite/testcase_0011/testcase_0011_properties.json
index b03f9cf..ac17570 100644
--- a/system_test/replication_testsuite/testcase_0011/testcase_0011_properties.json
+++ b/system_test/replication_testsuite/testcase_0011/testcase_0011_properties.json
@@ -61,6 +61,7 @@
     },
     {
       "entity_id": "4",
+      "new-producer":"true",
       "topic": "test_1",
       "threads": "5",
       "compression-codec": "1",