You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2014/03/14 23:14:48 UTC

[2/5] KAFKA-1012; Consumer offset management in Kafka; patched by Tejas Patil and Joel Koshy; feedback and reviews from Neha Narkhede, Jun Rao, Guozhang Wang, Sriram Subramanian, Joe Stein, Chris Riccomini

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/mirror_maker/config/whitelisttest_2.consumer.properties
----------------------------------------------------------------------
diff --git a/system_test/mirror_maker/config/whitelisttest_2.consumer.properties b/system_test/mirror_maker/config/whitelisttest_2.consumer.properties
deleted file mode 100644
index f1a902b..0000000
--- a/system_test/mirror_maker/config/whitelisttest_2.consumer.properties
+++ /dev/null
@@ -1,28 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-# 
-#    http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# see kafka.consumer.ConsumerConfig for more details
-
-# zk connection string
-# comma separated host:port pairs, each corresponding to a zk
-# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
-zk.connect=localhost:2182
-
-# timeout in ms for connecting to zookeeper
-zk.connection.timeout.ms=1000000
-
-#consumer group id
-group.id=group1
-shallow.iterator.enable=true
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/mirror_maker/config/zookeeper_source_1.properties
----------------------------------------------------------------------
diff --git a/system_test/mirror_maker/config/zookeeper_source_1.properties b/system_test/mirror_maker/config/zookeeper_source_1.properties
deleted file mode 100644
index f851796..0000000
--- a/system_test/mirror_maker/config/zookeeper_source_1.properties
+++ /dev/null
@@ -1,18 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-# 
-#    http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# the directory where the snapshot is stored.
-dataDir=/tmp/zookeeper_source-1
-# the port at which the clients will connect
-clientPort=2181

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/mirror_maker/config/zookeeper_source_2.properties
----------------------------------------------------------------------
diff --git a/system_test/mirror_maker/config/zookeeper_source_2.properties b/system_test/mirror_maker/config/zookeeper_source_2.properties
deleted file mode 100644
index d534d18..0000000
--- a/system_test/mirror_maker/config/zookeeper_source_2.properties
+++ /dev/null
@@ -1,18 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-# 
-#    http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# the directory where the snapshot is stored.
-dataDir=/tmp/zookeeper_source-2
-# the port at which the clients will connect
-clientPort=2182

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/mirror_maker/config/zookeeper_target.properties
----------------------------------------------------------------------
diff --git a/system_test/mirror_maker/config/zookeeper_target.properties b/system_test/mirror_maker/config/zookeeper_target.properties
deleted file mode 100644
index 55a7eb1..0000000
--- a/system_test/mirror_maker/config/zookeeper_target.properties
+++ /dev/null
@@ -1,18 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-# 
-#    http://www.apache.org/licenses/LICENSE-2.0
-# 
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# the directory where the snapshot is stored.
-dataDir=/tmp/zookeeper_target
-# the port at which the clients will connect
-clientPort=2183

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/mirror_maker_testsuite/mirror_maker_test.py
----------------------------------------------------------------------
diff --git a/system_test/mirror_maker_testsuite/mirror_maker_test.py b/system_test/mirror_maker_testsuite/mirror_maker_test.py
index fd18088..c0117c6 100644
--- a/system_test/mirror_maker_testsuite/mirror_maker_test.py
+++ b/system_test/mirror_maker_testsuite/mirror_maker_test.py
@@ -166,7 +166,7 @@ class MirrorMakerTest(ReplicationUtils, SetupUtils):
                 time.sleep(5)
 
                 self.log_message("creating topics")
-                kafka_system_test_utils.create_topic(self.systemTestEnv, self.testcaseEnv)
+                kafka_system_test_utils.create_topic_for_producer_performance(self.systemTestEnv, self.testcaseEnv)
                 self.anonLogger.info("sleeping for 5s")
                 time.sleep(5)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/offset_management_testsuite/cluster_config.json
----------------------------------------------------------------------
diff --git a/system_test/offset_management_testsuite/cluster_config.json b/system_test/offset_management_testsuite/cluster_config.json
new file mode 100644
index 0000000..dcca200
--- /dev/null
+++ b/system_test/offset_management_testsuite/cluster_config.json
@@ -0,0 +1,103 @@
+{
+    "cluster_config": [
+        {
+            "entity_id": "0",
+            "hostname": "localhost",
+            "role": "zookeeper",
+            "cluster_name":"source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9100"
+        },
+        {
+            "entity_id": "1",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name":"source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9101"
+        },
+        {
+            "entity_id": "2",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name":"source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9102"
+        },
+        {
+            "entity_id": "3",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name":"source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9103"
+        },
+        {
+            "entity_id": "4",
+            "hostname": "localhost",
+            "role": "broker",
+            "cluster_name":"source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9104"
+        },
+        {
+            "entity_id": "5",
+            "hostname": "localhost",
+            "role": "producer_performance",
+            "cluster_name":"source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9105"
+        },
+        {
+            "entity_id": "6",
+            "hostname": "localhost",
+            "role": "console_consumer",
+            "cluster_name":"source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9106"
+        },
+        {
+            "entity_id": "7",
+            "hostname": "localhost",
+            "role": "console_consumer",
+            "cluster_name":"source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9107"
+        },
+        {
+            "entity_id": "8",
+            "hostname": "localhost",
+            "role": "console_consumer",
+            "cluster_name":"source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9108"
+        },
+        {
+            "entity_id": "9",
+            "hostname": "localhost",
+            "role": "console_consumer",
+            "cluster_name":"source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9109"
+        },
+        {
+            "entity_id": "10",
+            "hostname": "localhost",
+            "role": "console_consumer",
+            "cluster_name":"source",
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9110"
+        }
+    ]
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/offset_management_testsuite/config/console_consumer.properties
----------------------------------------------------------------------
diff --git a/system_test/offset_management_testsuite/config/console_consumer.properties b/system_test/offset_management_testsuite/config/console_consumer.properties
new file mode 100644
index 0000000..a2ab8b9
--- /dev/null
+++ b/system_test/offset_management_testsuite/config/console_consumer.properties
@@ -0,0 +1,2 @@
+auto.offset.reset=smallest
+auto.commit.interval.ms=1000

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/offset_management_testsuite/config/producer_performance.properties
----------------------------------------------------------------------
diff --git a/system_test/offset_management_testsuite/config/producer_performance.properties b/system_test/offset_management_testsuite/config/producer_performance.properties
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/offset_management_testsuite/config/server.properties
----------------------------------------------------------------------
diff --git a/system_test/offset_management_testsuite/config/server.properties b/system_test/offset_management_testsuite/config/server.properties
new file mode 100644
index 0000000..2b988f8
--- /dev/null
+++ b/system_test/offset_management_testsuite/config/server.properties
@@ -0,0 +1,144 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+############################# Server Basics #############################
+
+# The id of the broker. This must be set to a unique integer for each broker.
+broker.id=0
+
+# Hostname the broker will advertise to consumers. If not set, kafka will use the value returned
+# from InetAddress.getLocalHost().  If there are multiple interfaces getLocalHost
+# may not be what you want.
+#host.name=
+
+
+############################# Socket Server Settings #############################
+
+# The port the socket server listens on
+port=9091
+
+# The number of threads handling network requests
+num.network.threads=2
+ 
+# The number of threads doing disk I/O
+num.io.threads=2
+
+# The send buffer (SO_SNDBUF) used by the socket server
+socket.send.buffer.bytes=1048576
+
+# The receive buffer (SO_RCVBUF) used by the socket server
+socket.receive.buffer.bytes=1048576
+
+# The maximum size of a request that the socket server will accept (protection against OOM)
+socket.request.max.bytes=104857600
+
+
+############################# Log Basics #############################
+
+# The directory under which to store log files
+log.dir=/tmp/kafka_server_logs
+
+# The default number of log partitions per topic. More partitions allow greater
+# parallelism for consumption, but this will also result in more files across
+# the brokers.
+num.partitions=5
+
+# Overrides for for the default given by num.partitions on a per-topic basis
+#topic.partition.count.map=topic1:3, topic2:4
+
+############################# Log Flush Policy #############################
+
+# The following configurations control the flush of data to disk. This is the most
+# important performance knob in kafka.
+# There are a few important trade-offs here:
+#    1. Durability: Unflushed data is at greater risk of loss in the event of a crash.
+#    2. Latency: Data is not made available to consumers until it is flushed (which adds latency).
+#    3. Throughput: The flush is generally the most expensive operation. 
+# The settings below allow one to configure the flush policy to flush data after a period of time or
+# every N messages (or both). This can be done globally and overridden on a per-topic basis.
+
+# The number of messages to accept before forcing a flush of data to disk
+log.flush.interval.messages=10000
+
+# The maximum amount of time a message can sit in a log before we force a flush
+log.flush.interval.ms=1000
+
+# Per-topic overrides for log.flush.interval.ms
+#log.flush.intervals.ms.per.topic=topic1:1000, topic2:3000
+
+# The interval (in ms) at which logs are checked to see if they need to be flushed to disk.
+log.flush.scheduler.interval.ms=1000
+
+############################# Log Retention Policy #############################
+
+# The following configurations control the disposal of log segments. The policy can
+# be set to delete segments after a period of time, or after a given size has accumulated.
+# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
+# from the end of the log.
+
+# The minimum age of a log file to be eligible for deletion
+log.retention.hours=168
+
+# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
+# segments don't drop below log.retention.bytes.
+#log.retention.bytes=1073741824
+log.retention.bytes=-1
+
+# The maximum size of a log segment file. When this size is reached a new log segment will be created.
+#log.segment.size=536870912
+log.segment.bytes=102400
+
+# The interval at which log segments are checked to see if they can be deleted according 
+# to the retention policies
+log.cleanup.interval.mins=1
+
+############################# Zookeeper #############################
+
+# Enable connecting to zookeeper
+enable.zookeeper=true
+
+# Zk connection string (see zk docs for details).
+# This is a comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
+# You can also append an optional chroot string to the urls to specify the
+# root directory for all kafka znodes.
+zookeeper.connect=localhost:2181
+
+# Timeout in ms for connecting to zookeeper
+zookeeper.connection.timeout.ms=1000000
+
+monitoring.period.secs=1
+message.max.bytes=1000000
+queued.max.requests=500
+log.roll.hours=168
+log.index.size.max.bytes=10485760
+log.index.interval.bytes=4096
+auto.create.topics.enable=true
+controller.socket.timeout.ms=30000
+controller.message.queue.size=10
+default.replication.factor=1
+replica.lag.time.max.ms=10000
+replica.lag.max.messages=4000
+replica.socket.timeout.ms=30000
+replica.socket.receive.buffer.bytes=65536
+replica.fetch.max.bytes=1048576
+replica.fetch.wait.max.ms=500
+replica.fetch.min.bytes=4096
+num.replica.fetchers=1
+
+offsets.topic.num.partitions=2
+offsets.topic.replication.factor=4
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/offset_management_testsuite/config/zookeeper.properties
----------------------------------------------------------------------
diff --git a/system_test/offset_management_testsuite/config/zookeeper.properties b/system_test/offset_management_testsuite/config/zookeeper.properties
new file mode 100644
index 0000000..5474a72
--- /dev/null
+++ b/system_test/offset_management_testsuite/config/zookeeper.properties
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# the directory where the snapshot is stored.
+dataDir=/tmp/zookeeper
+# the port at which the clients will connect
+clientPort=2181
+# disable the per-ip limit on the number of connections since this is a non-production config
+maxClientCnxns=0
+syncLimit=5
+initLimit=10
+tickTime=2000

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/offset_management_testsuite/offset_management_test.py
----------------------------------------------------------------------
diff --git a/system_test/offset_management_testsuite/offset_management_test.py b/system_test/offset_management_testsuite/offset_management_test.py
new file mode 100644
index 0000000..12b5cd2
--- /dev/null
+++ b/system_test/offset_management_testsuite/offset_management_test.py
@@ -0,0 +1,298 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#!/usr/bin/env python
+
+# ===================================
+# offset_management_test.py
+# ===================================
+
+import os
+import signal
+import sys
+import time
+import traceback
+
+from   system_test_env    import SystemTestEnv
+sys.path.append(SystemTestEnv.SYSTEM_TEST_UTIL_DIR)
+
+from   setup_utils        import SetupUtils
+from   replication_utils  import ReplicationUtils
+import system_test_utils
+from   testcase_env       import TestcaseEnv
+
+# product specific: Kafka
+import kafka_system_test_utils
+import metrics
+
+class OffsetManagementTest(ReplicationUtils, SetupUtils):
+
+    testModuleAbsPathName = os.path.realpath(__file__)
+    testSuiteAbsPathName  = os.path.abspath(os.path.dirname(testModuleAbsPathName))
+
+    def __init__(self, systemTestEnv):
+
+        # SystemTestEnv - provides cluster level environment settings
+        #     such as entity_id, hostname, kafka_home, java_home which
+        #     are available in a list of dictionary named 
+        #     "clusterEntityConfigDictList"
+        self.systemTestEnv = systemTestEnv
+
+        super(OffsetManagementTest, self).__init__(self)
+
+        # dict to pass user-defined attributes to logger argument: "extra"
+        d = {'name_of_class': self.__class__.__name__}
+
+    def signal_handler(self, signal, frame):
+        self.log_message("Interrupt detected - User pressed Ctrl+c")
+
+        # perform the necessary cleanup here when user presses Ctrl+c and it may be product specific
+        self.log_message("stopping all entities - please wait ...")
+        kafka_system_test_utils.stop_all_remote_running_processes(self.systemTestEnv, self.testcaseEnv)
+        sys.exit(1) 
+
+    def runTest(self):
+
+        # ======================================================================
+        # get all testcase directories under this testsuite
+        # ======================================================================
+        testCasePathNameList = system_test_utils.get_dir_paths_with_prefix(
+            self.testSuiteAbsPathName, SystemTestEnv.SYSTEM_TEST_CASE_PREFIX)
+        testCasePathNameList.sort()
+
+        replicationUtils = ReplicationUtils(self)
+
+        # =============================================================
+        # launch each testcase one by one: testcase_1, testcase_2, ...
+        # =============================================================
+        for testCasePathName in testCasePathNameList:
+   
+            skipThisTestCase = False
+
+            try: 
+                # ======================================================================
+                # A new instance of TestcaseEnv to keep track of this testcase's env vars
+                # and initialize some env vars as testCasePathName is available now
+                # ======================================================================
+                self.testcaseEnv = TestcaseEnv(self.systemTestEnv, self)
+                self.testcaseEnv.testSuiteBaseDir = self.testSuiteAbsPathName
+                self.testcaseEnv.initWithKnownTestCasePathName(testCasePathName)
+                self.testcaseEnv.testcaseArgumentsDict = self.testcaseEnv.testcaseNonEntityDataDict["testcase_args"]
+
+                # ======================================================================
+                # SKIP if this case is IN testcase_to_skip.json or NOT IN testcase_to_run.json
+                # ======================================================================
+                testcaseDirName = self.testcaseEnv.testcaseResultsDict["_test_case_name"]
+
+                if self.systemTestEnv.printTestDescriptionsOnly:
+                    self.testcaseEnv.printTestCaseDescription(testcaseDirName)
+                    continue
+                elif self.systemTestEnv.isTestCaseToSkip(self.__class__.__name__, testcaseDirName):
+                    self.log_message("Skipping : " + testcaseDirName)
+                    skipThisTestCase = True
+                    continue
+                else:
+                    self.testcaseEnv.printTestCaseDescription(testcaseDirName)
+                    system_test_utils.setup_remote_hosts_with_testcase_level_cluster_config(self.systemTestEnv, testCasePathName)
+
+                # ============================================================================== #
+                # ============================================================================== #
+                #                   Product Specific Testing Code Starts Here:                   #
+                # ============================================================================== #
+                # ============================================================================== #
+    
+                # initialize self.testcaseEnv with user-defined environment variables (product specific)
+                self.testcaseEnv.userDefinedEnvVarDict["stopBackgroundProducer"]    = False
+                self.testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"] = False
+
+                # initialize signal handler
+                signal.signal(signal.SIGINT, self.signal_handler)
+
+                # TestcaseEnv.testcaseConfigsList initialized by reading testcase properties file:
+                #   system_test/<suite_name>_testsuite/testcase_<n>/testcase_<n>_properties.json
+                self.testcaseEnv.testcaseConfigsList = system_test_utils.get_json_list_data(
+                    self.testcaseEnv.testcasePropJsonPathName)
+                 
+                # clean up data directories specified in zookeeper.properties and kafka_server_<n>.properties
+                kafka_system_test_utils.cleanup_data_at_remote_hosts(self.systemTestEnv, self.testcaseEnv)
+
+                # create "LOCAL" log directories for metrics, dashboards for each entity under this testcase
+                # for collecting logs from remote machines
+                kafka_system_test_utils.generate_testcase_log_dirs(self.systemTestEnv, self.testcaseEnv)
+
+                # TestcaseEnv - initialize producer & consumer config / log file pathnames
+                kafka_system_test_utils.init_entity_props(self.systemTestEnv, self.testcaseEnv)
+
+                # generate remote hosts log/config dirs if not exist
+                kafka_system_test_utils.generate_testcase_log_dirs_in_remote_hosts(self.systemTestEnv, self.testcaseEnv)
+    
+                # generate properties files for zookeeper, kafka, producer, and consumer:
+                # 1. copy system_test/<suite_name>_testsuite/config/*.properties to 
+                #    system_test/<suite_name>_testsuite/testcase_<n>/config/
+                # 2. update all properties files in system_test/<suite_name>_testsuite/testcase_<n>/config
+                #    by overriding the settings specified in:
+                #    system_test/<suite_name>_testsuite/testcase_<n>/testcase_<n>_properties.json
+                kafka_system_test_utils.generate_overriden_props_files(self.testSuiteAbsPathName,
+                    self.testcaseEnv, self.systemTestEnv)
+               
+                # =============================================
+                # preparing all entities to start the test
+                # =============================================
+                self.log_message("starting zookeepers")
+                kafka_system_test_utils.start_zookeepers(self.systemTestEnv, self.testcaseEnv)
+                self.anonLogger.info("sleeping for 2s")
+                time.sleep(2)
+
+                self.log_message("starting brokers")
+                kafka_system_test_utils.start_brokers(self.systemTestEnv, self.testcaseEnv)
+                self.anonLogger.info("sleeping for 5s")
+                time.sleep(5)
+
+                self.log_message("creating offset topic")
+                kafka_system_test_utils.create_topic(self.systemTestEnv, self.testcaseEnv, "__consumer_offsets", 3, 2)
+                self.anonLogger.info("sleeping for 5s")
+                time.sleep(5)
+
+                # =============================================
+                # starting producer 
+                # =============================================
+                self.log_message("starting producer in the background")
+                kafka_system_test_utils.start_producer_performance(self.systemTestEnv, self.testcaseEnv, False)
+                msgProducingFreeTimeSec = self.testcaseEnv.testcaseArgumentsDict["message_producing_free_time_sec"]
+                self.anonLogger.info("sleeping for " + msgProducingFreeTimeSec + " sec to produce some messages")
+                time.sleep(int(msgProducingFreeTimeSec))
+
+                kafka_system_test_utils.start_console_consumers(self.systemTestEnv, self.testcaseEnv)
+
+                kafka_system_test_utils.get_leader_for(self.systemTestEnv, self.testcaseEnv, "__consumer_offsets", 0)
+
+                # =============================================
+                # A while-loop to bounce consumers as specified
+                # by "num_iterations" in testcase_n_properties.json
+                # =============================================
+                i = 1
+                numIterations = int(self.testcaseEnv.testcaseArgumentsDict["num_iteration"])
+                bouncedEntityDownTimeSec = 10
+                try:
+                    bouncedEntityDownTimeSec = int(self.testcaseEnv.testcaseArgumentsDict["bounced_entity_downtime_sec"])
+                except:
+                    pass
+
+                # group1 -> offsets partition 0 // has one consumer; eid: 6
+                # group2 -> offsets partition 1 // has four consumers; eid: 7, 8, 9, 10
+
+                offsets_0_leader_entity = kafka_system_test_utils.get_leader_for(self.systemTestEnv, self.testcaseEnv, "__consumer_offsets", 0)
+                offsets_1_leader_entity = kafka_system_test_utils.get_leader_for(self.systemTestEnv, self.testcaseEnv, "__consumer_offsets", 1)
+
+                while i <= numIterations:
+
+                    self.log_message("Iteration " + str(i) + " of " + str(numIterations))
+                    kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, offsets_0_leader_entity, self.testcaseEnv.entityBrokerParentPidDict[offsets_0_leader_entity])
+                    kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, offsets_1_leader_entity, self.testcaseEnv.entityBrokerParentPidDict[offsets_1_leader_entity])
+
+                    # =============================================
+                    # Bounce consumers if specified in testcase config
+                    # =============================================
+                    bounceConsumers = self.testcaseEnv.testcaseArgumentsDict["bounce_consumers"]
+                    self.log_message("bounce_consumers flag : " + bounceConsumers)
+                    if (bounceConsumers.lower() == "true"):
+
+                        clusterConfigList       = self.systemTestEnv.clusterEntityConfigDictList
+                        consumerEntityIdList    = system_test_utils.get_data_from_list_of_dicts( clusterConfigList, "role", "console_consumer", "entity_id")
+
+                        for stoppedConsumerEntityId in consumerEntityIdList:
+                            consumerPPID = self.testcaseEnv.entityConsoleConsumerParentPidDict[stoppedConsumerEntityId]
+                            self.log_message("stopping consumer: " + consumerPPID)
+                            kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, stoppedConsumerEntityId, consumerPPID)
+
+                        self.anonLogger.info("sleeping for " + str(bouncedEntityDownTimeSec) + " sec")
+                        time.sleep(bouncedEntityDownTimeSec)
+                        # leaders would have changed during the above bounce.
+                        self.log_message("starting the previously terminated consumers.")
+                        for stoppedConsumerEntityId in consumerEntityIdList:
+                            # starting previously terminated consumer
+                            kafka_system_test_utils.start_console_consumers(self.systemTestEnv, self.testcaseEnv, stoppedConsumerEntityId)
+
+                        self.log_message("starting the previously terminated brokers")
+                        kafka_system_test_utils.start_entity_in_background(self.systemTestEnv, self.testcaseEnv, offsets_0_leader_entity)
+                        kafka_system_test_utils.start_entity_in_background(self.systemTestEnv, self.testcaseEnv, offsets_1_leader_entity)
+
+                    self.anonLogger.info("sleeping for 15s")
+                    time.sleep(15)
+                    i += 1
+                # while loop
+
+                # =============================================
+                # tell producer to stop
+                # =============================================
+                self.testcaseEnv.lock.acquire()
+                self.testcaseEnv.userDefinedEnvVarDict["stopBackgroundProducer"] = True
+                time.sleep(1)
+                self.testcaseEnv.lock.release()
+                time.sleep(1)
+
+                # =============================================
+                # wait for producer thread's update of
+                # "backgroundProducerStopped" to be "True"
+                # =============================================
+                while 1:
+                    self.testcaseEnv.lock.acquire()
+                    self.logger.info("status of backgroundProducerStopped : [" + \
+                        str(self.testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"]) + "]", extra=self.d)
+                    if self.testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"]:
+                        time.sleep(1)
+                        self.logger.info("all producer threads completed", extra=self.d)
+                        break
+                    time.sleep(1)
+                    self.testcaseEnv.lock.release()
+                    time.sleep(2)
+
+                self.anonLogger.info("sleeping for 15s")
+                time.sleep(15)
+
+                # =============================================
+                # this testcase is completed - stop all entities
+                # =============================================
+                self.log_message("stopping all entities")
+                for entityId, parentPid in self.testcaseEnv.entityBrokerParentPidDict.items():
+                    kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, entityId, parentPid)
+
+                for entityId, parentPid in self.testcaseEnv.entityZkParentPidDict.items():
+                    kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, entityId, parentPid)
+
+                # make sure all entities are stopped
+                kafka_system_test_utils.ps_grep_terminate_running_entity(self.systemTestEnv)
+
+                # =============================================
+                # collect logs from remote hosts
+                # =============================================
+                kafka_system_test_utils.collect_logs_from_remote_hosts(self.systemTestEnv, self.testcaseEnv)
+
+                # =============================================
+                # validate the data matched and checksum
+                # =============================================
+                self.log_message("validating data matched")
+                kafka_system_test_utils.validate_data_matched_in_multi_topics_from_single_consumer_producer(self.systemTestEnv, self.testcaseEnv, replicationUtils)
+
+            except Exception as e:
+                self.log_message("Exception while running test {0}".format(e))
+                traceback.print_exc()
+
+            finally:
+                if not skipThisTestCase and not self.systemTestEnv.printTestDescriptionsOnly:
+                    self.log_message("stopping all entities - please wait ...")
+                    kafka_system_test_utils.stop_all_remote_running_processes(self.systemTestEnv, self.testcaseEnv)
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/offset_management_testsuite/testcase_7001/testcase_7001_properties.json
----------------------------------------------------------------------
diff --git a/system_test/offset_management_testsuite/testcase_7001/testcase_7001_properties.json b/system_test/offset_management_testsuite/testcase_7001/testcase_7001_properties.json
new file mode 100644
index 0000000..02af3e8
--- /dev/null
+++ b/system_test/offset_management_testsuite/testcase_7001/testcase_7001_properties.json
@@ -0,0 +1,95 @@
+{
+  "description": {"01":"To Test : 'Basic offset management test.'",
+                  "02":"Set up a Zk and Kafka cluster.",
+                  "03":"Produce messages to a multiple topics - various partition counts.",
+                  "04":"Start multiple consumer groups to read various subsets of above topics.",
+                  "05":"Bounce consumers.",
+                  "06":"Verify that there are no duplicate messages or lost messages on any consumer group.",
+                  "07":"Producer dimensions : mode:sync, acks:-1, comp:0"
+  },
+  "testcase_args": {
+    "bounce_leaders": "false",
+    "bounce_consumers": "true",
+    "replica_factor": "3",
+    "num_partition": "1",
+    "num_iteration": "1",
+    "sleep_seconds_between_producer_calls": "1",
+    "message_producing_free_time_sec": "15",
+    "num_messages_to_produce_per_producer_call": "50",
+    "num_topics_for_auto_generated_string":"1"
+  },
+  "entities": [
+    {
+      "entity_id": "0",
+      "clientPort": "2108",
+      "dataDir": "/tmp/zookeeper_0",
+      "log_filename": "zookeeper_0.log",
+      "config_filename": "zookeeper_0.properties"
+    },
+    {
+      "entity_id": "1",
+      "port": "9091",
+      "broker.id": "1",
+      "log.segment.bytes": "10240",
+      "log.dir": "/tmp/kafka_server_1_logs",
+      "default.replication.factor": "3",
+      "num.partitions": "5",
+      "log_filename": "kafka_server_1.log",
+      "config_filename": "kafka_server_1.properties"
+    },
+    {
+      "entity_id": "2",
+      "port": "9092",
+      "broker.id": "2",
+      "log.segment.bytes": "10240",
+      "log.dir": "/tmp/kafka_server_2_logs",
+      "default.replication.factor": "3",
+      "num.partitions": "5",
+      "log_filename": "kafka_server_2.log",
+      "config_filename": "kafka_server_2.properties"
+    },
+    {
+      "entity_id": "3",
+      "port": "9093",
+      "broker.id": "3",
+      "log.segment.bytes": "10240",
+      "log.dir": "/tmp/kafka_server_3_logs",
+      "default.replication.factor": "3",
+      "num.partitions": "5",
+      "log_filename": "kafka_server_3.log",
+      "config_filename": "kafka_server_3.properties"
+    },
+    {
+      "entity_id": "4",
+      "port": "9094",
+      "broker.id": "4",
+      "log.segment.bytes": "10240",
+      "log.dir": "/tmp/kafka_server_4_logs",
+      "default.replication.factor": "3",
+      "num.partitions": "5",
+      "log_filename": "kafka_server_4.log",
+      "config_filename": "kafka_server_4.properties"
+    },
+    {
+      "entity_id": "5",
+      "topic": "test",
+      "threads": "3",
+      "compression-codec": "0",
+      "message-size": "500",
+      "message": "1000",
+      "request-num-acks": "-1",
+      "sync":"true",
+      "producer-num-retries":"5",
+      "log_filename": "producer_performance_10.log",
+      "config_filename": "producer_performance_10.properties"
+    },
+    {
+      "entity_id": "6",
+      "topic": "test_0001",
+      "group.id": "group1",
+      "consumer-timeout-ms": "30000",
+      "log_filename": "console_consumer.log",
+      "config_filename": "console_consumer_6.properties"
+    }
+   ]
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/offset_management_testsuite/testcase_7002/config/kafka_server_1.properties
----------------------------------------------------------------------
diff --git a/system_test/offset_management_testsuite/testcase_7002/config/kafka_server_1.properties b/system_test/offset_management_testsuite/testcase_7002/config/kafka_server_1.properties
new file mode 100644
index 0000000..41ec6e4
--- /dev/null
+++ b/system_test/offset_management_testsuite/testcase_7002/config/kafka_server_1.properties
@@ -0,0 +1,148 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+############################# Server Basics #############################
+
+# The id of the broker. This must be set to a unique integer for each broker.
+broker.id=1
+
+# Hostname the broker will advertise to consumers. If not set, kafka will use the value returned
+# from InetAddress.getLocalHost().  If there are multiple interfaces getLocalHost
+# may not be what you want.
+#host.name=
+
+
+############################# Socket Server Settings #############################
+
+# The port the socket server listens on
+port=9091
+
+# The number of threads handling network requests
+num.network.threads=2
+ 
+# The number of threads doing disk I/O
+num.io.threads=2
+
+# The send buffer (SO_SNDBUF) used by the socket server
+socket.send.buffer.bytes=1048576
+
+# The receive buffer (SO_RCVBUF) used by the socket server
+socket.receive.buffer.bytes=1048576
+
+# The maximum size of a request that the socket server will accept (protection against OOM)
+socket.request.max.bytes=104857600
+
+
+############################# Log Basics #############################
+
+# The directory under which to store log files
+log.dir=/tmp/kafka_server_1_logs
+
+# The default number of log partitions per topic. More partitions allow greater
+# parallelism for consumption, but this will also result in more files across
+# the brokers.
+num.partitions=5
+
+# Overrides for for the default given by num.partitions on a per-topic basis
+#topic.partition.count.map=topic1:3, topic2:4
+
+############################# Log Flush Policy #############################
+
+# The following configurations control the flush of data to disk. This is the most
+# important performance knob in kafka.
+# There are a few important trade-offs here:
+#    1. Durability: Unflushed data is at greater risk of loss in the event of a crash.
+#    2. Latency: Data is not made available to consumers until it is flushed (which adds latency).
+#    3. Throughput: The flush is generally the most expensive operation. 
+# The settings below allow one to configure the flush policy to flush data after a period of time or
+# every N messages (or both). This can be done globally and overridden on a per-topic basis.
+
+# The number of messages to accept before forcing a flush of data to disk
+log.flush.interval.messages=10000
+
+# The maximum amount of time a message can sit in a log before we force a flush
+log.flush.interval.ms=1000
+
+# Per-topic overrides for log.flush.interval.ms
+#log.flush.intervals.ms.per.topic=topic1:1000, topic2:3000
+
+# The interval (in ms) at which logs are checked to see if they need to be flushed to disk.
+log.flush.scheduler.interval.ms=1000
+
+############################# Log Retention Policy #############################
+
+# The following configurations control the disposal of log segments. The policy can
+# be set to delete segments after a period of time, or after a given size has accumulated.
+# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
+# from the end of the log.
+
+# The minimum age of a log file to be eligible for deletion
+log.retention.hours=168
+
+# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
+# segments don't drop below log.retention.bytes.
+#log.retention.bytes=1073741824
+log.retention.bytes=-1
+
+# The maximum size of a log segment file. When this size is reached a new log segment will be created.
+#log.segment.size=536870912
+log.segment.bytes=10240
+
+# The interval at which log segments are checked to see if they can be deleted according 
+# to the retention policies
+log.cleanup.interval.mins=1
+
+############################# Zookeeper #############################
+
+# Enable connecting to zookeeper
+enable.zookeeper=true
+
+# Zk connection string (see zk docs for details).
+# This is a comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
+# You can also append an optional chroot string to the urls to specify the
+# root directory for all kafka znodes.
+zookeeper.connect=localhost:2108
+
+# Timeout in ms for connecting to zookeeper
+zookeeper.connection.timeout.ms=1000000
+
+monitoring.period.secs=1
+message.max.bytes=1000000
+queued.max.requests=500
+log.roll.hours=168
+log.index.size.max.bytes=10485760
+log.index.interval.bytes=4096
+auto.create.topics.enable=true
+controller.socket.timeout.ms=30000
+controller.message.queue.size=10
+default.replication.factor=3
+replica.lag.time.max.ms=10000
+replica.lag.max.messages=4000
+replica.socket.timeout.ms=30000
+replica.socket.receive.buffer.bytes=65536
+replica.fetch.max.bytes=1048576
+replica.fetch.wait.max.ms=500
+replica.fetch.min.bytes=4096
+num.replica.fetchers=1
+
+offsets.topic.num.partitions=2
+offsets.topic.replication.factor=4
+
+kafka.csv.metrics.dir=/home/jkoshy/Projects/kafka/system_test/offset_management_testsuite/testcase_7002/logs/broker-1/metrics
+kafka.csv.metrics.reporter.enabled=true
+kafka.metrics.polling.interval.secs=5
+kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/offset_management_testsuite/testcase_7002/config/kafka_server_2.properties
----------------------------------------------------------------------
diff --git a/system_test/offset_management_testsuite/testcase_7002/config/kafka_server_2.properties b/system_test/offset_management_testsuite/testcase_7002/config/kafka_server_2.properties
new file mode 100644
index 0000000..727e237
--- /dev/null
+++ b/system_test/offset_management_testsuite/testcase_7002/config/kafka_server_2.properties
@@ -0,0 +1,148 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+############################# Server Basics #############################
+
+# The id of the broker. This must be set to a unique integer for each broker.
+broker.id=2
+
+# Hostname the broker will advertise to consumers. If not set, kafka will use the value returned
+# from InetAddress.getLocalHost().  If there are multiple interfaces getLocalHost
+# may not be what you want.
+#host.name=
+
+
+############################# Socket Server Settings #############################
+
+# The port the socket server listens on
+port=9092
+
+# The number of threads handling network requests
+num.network.threads=2
+ 
+# The number of threads doing disk I/O
+num.io.threads=2
+
+# The send buffer (SO_SNDBUF) used by the socket server
+socket.send.buffer.bytes=1048576
+
+# The receive buffer (SO_RCVBUF) used by the socket server
+socket.receive.buffer.bytes=1048576
+
+# The maximum size of a request that the socket server will accept (protection against OOM)
+socket.request.max.bytes=104857600
+
+
+############################# Log Basics #############################
+
+# The directory under which to store log files
+log.dir=/tmp/kafka_server_2_logs
+
+# The default number of log partitions per topic. More partitions allow greater
+# parallelism for consumption, but this will also result in more files across
+# the brokers.
+num.partitions=5
+
+# Overrides for for the default given by num.partitions on a per-topic basis
+#topic.partition.count.map=topic1:3, topic2:4
+
+############################# Log Flush Policy #############################
+
+# The following configurations control the flush of data to disk. This is the most
+# important performance knob in kafka.
+# There are a few important trade-offs here:
+#    1. Durability: Unflushed data is at greater risk of loss in the event of a crash.
+#    2. Latency: Data is not made available to consumers until it is flushed (which adds latency).
+#    3. Throughput: The flush is generally the most expensive operation. 
+# The settings below allow one to configure the flush policy to flush data after a period of time or
+# every N messages (or both). This can be done globally and overridden on a per-topic basis.
+
+# The number of messages to accept before forcing a flush of data to disk
+log.flush.interval.messages=10000
+
+# The maximum amount of time a message can sit in a log before we force a flush
+log.flush.interval.ms=1000
+
+# Per-topic overrides for log.flush.interval.ms
+#log.flush.intervals.ms.per.topic=topic1:1000, topic2:3000
+
+# The interval (in ms) at which logs are checked to see if they need to be flushed to disk.
+log.flush.scheduler.interval.ms=1000
+
+############################# Log Retention Policy #############################
+
+# The following configurations control the disposal of log segments. The policy can
+# be set to delete segments after a period of time, or after a given size has accumulated.
+# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
+# from the end of the log.
+
+# The minimum age of a log file to be eligible for deletion
+log.retention.hours=168
+
+# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
+# segments don't drop below log.retention.bytes.
+#log.retention.bytes=1073741824
+log.retention.bytes=-1
+
+# The maximum size of a log segment file. When this size is reached a new log segment will be created.
+#log.segment.size=536870912
+log.segment.bytes=10240
+
+# The interval at which log segments are checked to see if they can be deleted according 
+# to the retention policies
+log.cleanup.interval.mins=1
+
+############################# Zookeeper #############################
+
+# Enable connecting to zookeeper
+enable.zookeeper=true
+
+# Zk connection string (see zk docs for details).
+# This is a comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
+# You can also append an optional chroot string to the urls to specify the
+# root directory for all kafka znodes.
+zookeeper.connect=localhost:2108
+
+# Timeout in ms for connecting to zookeeper
+zookeeper.connection.timeout.ms=1000000
+
+monitoring.period.secs=1
+message.max.bytes=1000000
+queued.max.requests=500
+log.roll.hours=168
+log.index.size.max.bytes=10485760
+log.index.interval.bytes=4096
+auto.create.topics.enable=true
+controller.socket.timeout.ms=30000
+controller.message.queue.size=10
+default.replication.factor=3
+replica.lag.time.max.ms=10000
+replica.lag.max.messages=4000
+replica.socket.timeout.ms=30000
+replica.socket.receive.buffer.bytes=65536
+replica.fetch.max.bytes=1048576
+replica.fetch.wait.max.ms=500
+replica.fetch.min.bytes=4096
+num.replica.fetchers=1
+
+offsets.topic.num.partitions=2
+offsets.topic.replication.factor=4
+
+kafka.csv.metrics.dir=/home/jkoshy/Projects/kafka/system_test/offset_management_testsuite/testcase_7002/logs/broker-2/metrics
+kafka.csv.metrics.reporter.enabled=true
+kafka.metrics.polling.interval.secs=5
+kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/offset_management_testsuite/testcase_7002/config/kafka_server_3.properties
----------------------------------------------------------------------
diff --git a/system_test/offset_management_testsuite/testcase_7002/config/kafka_server_3.properties b/system_test/offset_management_testsuite/testcase_7002/config/kafka_server_3.properties
new file mode 100644
index 0000000..e6fbbe1
--- /dev/null
+++ b/system_test/offset_management_testsuite/testcase_7002/config/kafka_server_3.properties
@@ -0,0 +1,148 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+############################# Server Basics #############################
+
+# The id of the broker. This must be set to a unique integer for each broker.
+broker.id=3
+
+# Hostname the broker will advertise to consumers. If not set, kafka will use the value returned
+# from InetAddress.getLocalHost().  If there are multiple interfaces getLocalHost
+# may not be what you want.
+#host.name=
+
+
+############################# Socket Server Settings #############################
+
+# The port the socket server listens on
+port=9093
+
+# The number of threads handling network requests
+num.network.threads=2
+ 
+# The number of threads doing disk I/O
+num.io.threads=2
+
+# The send buffer (SO_SNDBUF) used by the socket server
+socket.send.buffer.bytes=1048576
+
+# The receive buffer (SO_RCVBUF) used by the socket server
+socket.receive.buffer.bytes=1048576
+
+# The maximum size of a request that the socket server will accept (protection against OOM)
+socket.request.max.bytes=104857600
+
+
+############################# Log Basics #############################
+
+# The directory under which to store log files
+log.dir=/tmp/kafka_server_3_logs
+
+# The default number of log partitions per topic. More partitions allow greater
+# parallelism for consumption, but this will also result in more files across
+# the brokers.
+num.partitions=5
+
+# Overrides for for the default given by num.partitions on a per-topic basis
+#topic.partition.count.map=topic1:3, topic2:4
+
+############################# Log Flush Policy #############################
+
+# The following configurations control the flush of data to disk. This is the most
+# important performance knob in kafka.
+# There are a few important trade-offs here:
+#    1. Durability: Unflushed data is at greater risk of loss in the event of a crash.
+#    2. Latency: Data is not made available to consumers until it is flushed (which adds latency).
+#    3. Throughput: The flush is generally the most expensive operation. 
+# The settings below allow one to configure the flush policy to flush data after a period of time or
+# every N messages (or both). This can be done globally and overridden on a per-topic basis.
+
+# The number of messages to accept before forcing a flush of data to disk
+log.flush.interval.messages=10000
+
+# The maximum amount of time a message can sit in a log before we force a flush
+log.flush.interval.ms=1000
+
+# Per-topic overrides for log.flush.interval.ms
+#log.flush.intervals.ms.per.topic=topic1:1000, topic2:3000
+
+# The interval (in ms) at which logs are checked to see if they need to be flushed to disk.
+log.flush.scheduler.interval.ms=1000
+
+############################# Log Retention Policy #############################
+
+# The following configurations control the disposal of log segments. The policy can
+# be set to delete segments after a period of time, or after a given size has accumulated.
+# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
+# from the end of the log.
+
+# The minimum age of a log file to be eligible for deletion
+log.retention.hours=168
+
+# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
+# segments don't drop below log.retention.bytes.
+#log.retention.bytes=1073741824
+log.retention.bytes=-1
+
+# The maximum size of a log segment file. When this size is reached a new log segment will be created.
+#log.segment.size=536870912
+log.segment.bytes=10240
+
+# The interval at which log segments are checked to see if they can be deleted according 
+# to the retention policies
+log.cleanup.interval.mins=1
+
+############################# Zookeeper #############################
+
+# Enable connecting to zookeeper
+enable.zookeeper=true
+
+# Zk connection string (see zk docs for details).
+# This is a comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
+# You can also append an optional chroot string to the urls to specify the
+# root directory for all kafka znodes.
+zookeeper.connect=localhost:2108
+
+# Timeout in ms for connecting to zookeeper
+zookeeper.connection.timeout.ms=1000000
+
+monitoring.period.secs=1
+message.max.bytes=1000000
+queued.max.requests=500
+log.roll.hours=168
+log.index.size.max.bytes=10485760
+log.index.interval.bytes=4096
+auto.create.topics.enable=true
+controller.socket.timeout.ms=30000
+controller.message.queue.size=10
+default.replication.factor=3
+replica.lag.time.max.ms=10000
+replica.lag.max.messages=4000
+replica.socket.timeout.ms=30000
+replica.socket.receive.buffer.bytes=65536
+replica.fetch.max.bytes=1048576
+replica.fetch.wait.max.ms=500
+replica.fetch.min.bytes=4096
+num.replica.fetchers=1
+
+offsets.topic.num.partitions=2
+offsets.topic.replication.factor=4
+
+kafka.csv.metrics.dir=/home/jkoshy/Projects/kafka/system_test/offset_management_testsuite/testcase_7002/logs/broker-3/metrics
+kafka.csv.metrics.reporter.enabled=true
+kafka.metrics.polling.interval.secs=5
+kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/offset_management_testsuite/testcase_7002/config/kafka_server_4.properties
----------------------------------------------------------------------
diff --git a/system_test/offset_management_testsuite/testcase_7002/config/kafka_server_4.properties b/system_test/offset_management_testsuite/testcase_7002/config/kafka_server_4.properties
new file mode 100644
index 0000000..fee65bc
--- /dev/null
+++ b/system_test/offset_management_testsuite/testcase_7002/config/kafka_server_4.properties
@@ -0,0 +1,148 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+############################# Server Basics #############################
+
+# The id of the broker. This must be set to a unique integer for each broker.
+broker.id=4
+
+# Hostname the broker will advertise to consumers. If not set, kafka will use the value returned
+# from InetAddress.getLocalHost().  If there are multiple interfaces getLocalHost
+# may not be what you want.
+#host.name=
+
+
+############################# Socket Server Settings #############################
+
+# The port the socket server listens on
+port=9094
+
+# The number of threads handling network requests
+num.network.threads=2
+ 
+# The number of threads doing disk I/O
+num.io.threads=2
+
+# The send buffer (SO_SNDBUF) used by the socket server
+socket.send.buffer.bytes=1048576
+
+# The receive buffer (SO_RCVBUF) used by the socket server
+socket.receive.buffer.bytes=1048576
+
+# The maximum size of a request that the socket server will accept (protection against OOM)
+socket.request.max.bytes=104857600
+
+
+############################# Log Basics #############################
+
+# The directory under which to store log files
+log.dir=/tmp/kafka_server_4_logs
+
+# The default number of log partitions per topic. More partitions allow greater
+# parallelism for consumption, but this will also result in more files across
+# the brokers.
+num.partitions=5
+
+# Overrides for for the default given by num.partitions on a per-topic basis
+#topic.partition.count.map=topic1:3, topic2:4
+
+############################# Log Flush Policy #############################
+
+# The following configurations control the flush of data to disk. This is the most
+# important performance knob in kafka.
+# There are a few important trade-offs here:
+#    1. Durability: Unflushed data is at greater risk of loss in the event of a crash.
+#    2. Latency: Data is not made available to consumers until it is flushed (which adds latency).
+#    3. Throughput: The flush is generally the most expensive operation. 
+# The settings below allow one to configure the flush policy to flush data after a period of time or
+# every N messages (or both). This can be done globally and overridden on a per-topic basis.
+
+# The number of messages to accept before forcing a flush of data to disk
+log.flush.interval.messages=10000
+
+# The maximum amount of time a message can sit in a log before we force a flush
+log.flush.interval.ms=1000
+
+# Per-topic overrides for log.flush.interval.ms
+#log.flush.intervals.ms.per.topic=topic1:1000, topic2:3000
+
+# The interval (in ms) at which logs are checked to see if they need to be flushed to disk.
+log.flush.scheduler.interval.ms=1000
+
+############################# Log Retention Policy #############################
+
+# The following configurations control the disposal of log segments. The policy can
+# be set to delete segments after a period of time, or after a given size has accumulated.
+# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
+# from the end of the log.
+
+# The minimum age of a log file to be eligible for deletion
+log.retention.hours=168
+
+# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
+# segments don't drop below log.retention.bytes.
+#log.retention.bytes=1073741824
+log.retention.bytes=-1
+
+# The maximum size of a log segment file. When this size is reached a new log segment will be created.
+#log.segment.size=536870912
+log.segment.bytes=10240
+
+# The interval at which log segments are checked to see if they can be deleted according 
+# to the retention policies
+log.cleanup.interval.mins=1
+
+############################# Zookeeper #############################
+
+# Enable connecting to zookeeper
+enable.zookeeper=true
+
+# Zk connection string (see zk docs for details).
+# This is a comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
+# You can also append an optional chroot string to the urls to specify the
+# root directory for all kafka znodes.
+zookeeper.connect=localhost:2108
+
+# Timeout in ms for connecting to zookeeper
+zookeeper.connection.timeout.ms=1000000
+
+monitoring.period.secs=1
+message.max.bytes=1000000
+queued.max.requests=500
+log.roll.hours=168
+log.index.size.max.bytes=10485760
+log.index.interval.bytes=4096
+auto.create.topics.enable=true
+controller.socket.timeout.ms=30000
+controller.message.queue.size=10
+default.replication.factor=3
+replica.lag.time.max.ms=10000
+replica.lag.max.messages=4000
+replica.socket.timeout.ms=30000
+replica.socket.receive.buffer.bytes=65536
+replica.fetch.max.bytes=1048576
+replica.fetch.wait.max.ms=500
+replica.fetch.min.bytes=4096
+num.replica.fetchers=1
+
+offsets.topic.num.partitions=2
+offsets.topic.replication.factor=4
+
+kafka.csv.metrics.dir=/home/jkoshy/Projects/kafka/system_test/offset_management_testsuite/testcase_7002/logs/broker-4/metrics
+kafka.csv.metrics.reporter.enabled=true
+kafka.metrics.polling.interval.secs=5
+kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/offset_management_testsuite/testcase_7002/config/zookeeper_0.properties
----------------------------------------------------------------------
diff --git a/system_test/offset_management_testsuite/testcase_7002/config/zookeeper_0.properties b/system_test/offset_management_testsuite/testcase_7002/config/zookeeper_0.properties
new file mode 100644
index 0000000..97c07b9
--- /dev/null
+++ b/system_test/offset_management_testsuite/testcase_7002/config/zookeeper_0.properties
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+#    http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# the directory where the snapshot is stored.
+dataDir=/tmp/zookeeper_0
+# the port at which the clients will connect
+clientPort=2108
+# disable the per-ip limit on the number of connections since this is a non-production config
+maxClientCnxns=0
+syncLimit=5
+initLimit=10
+tickTime=2000
+server.1=localhost:2107:2109

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/offset_management_testsuite/testcase_7002/testcase_7002_properties.json
----------------------------------------------------------------------
diff --git a/system_test/offset_management_testsuite/testcase_7002/testcase_7002_properties.json b/system_test/offset_management_testsuite/testcase_7002/testcase_7002_properties.json
new file mode 100644
index 0000000..fdab69b
--- /dev/null
+++ b/system_test/offset_management_testsuite/testcase_7002/testcase_7002_properties.json
@@ -0,0 +1,127 @@
+{
+  "description": {"01":"To Test : 'Basic offset management test.'",
+                  "02":"Set up a Zk and Kafka cluster.",
+                  "03":"Produce messages to a multiple topics - various partition counts.",
+                  "04":"Start multiple consumer groups to read various subsets of above topics.",
+                  "05":"Bounce consumers.",
+                  "06":"Verify that there are no duplicate messages or lost messages on any consumer group.",
+                  "07":"Producer dimensions : mode:sync, acks:-1, comp:0"
+  },
+  "testcase_args": {
+    "bounce_leaders": "false",
+    "bounce_consumers": "true",
+    "replica_factor": "3",
+    "num_partition": "1",
+    "num_iteration": "1",
+    "sleep_seconds_between_producer_calls": "1",
+    "message_producing_free_time_sec": "15",
+    "num_messages_to_produce_per_producer_call": "50",
+    "num_topics_for_auto_generated_string":"3"
+  },
+  "entities": [
+    {
+      "entity_id": "0",
+      "clientPort": "2108",
+      "dataDir": "/tmp/zookeeper_0",
+      "log_filename": "zookeeper_0.log",
+      "config_filename": "zookeeper_0.properties"
+    },
+    {
+      "entity_id": "1",
+      "port": "9091",
+      "broker.id": "1",
+      "log.segment.bytes": "10240",
+      "log.dir": "/tmp/kafka_server_1_logs",
+      "default.replication.factor": "3",
+      "num.partitions": "5",
+      "log_filename": "kafka_server_1.log",
+      "config_filename": "kafka_server_1.properties"
+    },
+    {
+      "entity_id": "2",
+      "port": "9092",
+      "broker.id": "2",
+      "log.segment.bytes": "10240",
+      "log.dir": "/tmp/kafka_server_2_logs",
+      "default.replication.factor": "3",
+      "num.partitions": "5",
+      "log_filename": "kafka_server_2.log",
+      "config_filename": "kafka_server_2.properties"
+    },
+    {
+      "entity_id": "3",
+      "port": "9093",
+      "broker.id": "3",
+      "log.segment.bytes": "10240",
+      "log.dir": "/tmp/kafka_server_3_logs",
+      "default.replication.factor": "3",
+      "num.partitions": "5",
+      "log_filename": "kafka_server_3.log",
+      "config_filename": "kafka_server_3.properties"
+    },
+    {
+      "entity_id": "4",
+      "port": "9094",
+      "broker.id": "4",
+      "log.segment.bytes": "10240",
+      "log.dir": "/tmp/kafka_server_4_logs",
+      "default.replication.factor": "3",
+      "num.partitions": "5",
+      "log_filename": "kafka_server_4.log",
+      "config_filename": "kafka_server_4.properties"
+    },
+    {
+      "entity_id": "5",
+      "topic": "test",
+      "threads": "5",
+      "compression-codec": "0",
+      "message-size": "500",
+      "message": "1000",
+      "request-num-acks": "-1",
+      "sync":"true",
+      "producer-num-retries":"5",
+      "log_filename": "producer_performance_10.log",
+      "config_filename": "producer_performance_10.properties"
+    },
+    {
+      "entity_id": "6",
+      "topic": "test_0001",
+      "group.id": "group1",
+      "consumer-timeout-ms": "30000",
+      "log_filename": "console_consumer.log",
+      "config_filename": "console_consumer_6.properties"
+    },
+    {
+      "entity_id": "7",
+      "topic": "test_0002",
+      "group.id": "group2",
+      "consumer-timeout-ms": "30000",
+      "log_filename": "console_consumer.log",
+      "config_filename": "console_consumer_7.properties"
+    },
+    {
+      "entity_id": "8",
+      "topic": "test_0002",
+      "group.id": "group2",
+      "consumer-timeout-ms": "30000",
+      "log_filename": "console_consumer.log",
+      "config_filename": "console_consumer_8.properties"
+    },
+    {
+      "entity_id": "9",
+      "topic": "test_0002",
+      "group.id": "group2",
+      "consumer-timeout-ms": "30000",
+      "log_filename": "console_consumer.log",
+      "config_filename": "console_consumer_9.properties"
+    },
+    {
+      "entity_id": "10",
+      "topic": "test_0003",
+      "group.id": "group2",
+      "consumer-timeout-ms": "30000",
+      "log_filename": "console_consumer.log",
+      "config_filename": "console_consumer_10.properties"
+    }
+   ]
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/replication_testsuite/replica_basic_test.py
----------------------------------------------------------------------
diff --git a/system_test/replication_testsuite/replica_basic_test.py b/system_test/replication_testsuite/replica_basic_test.py
index 5d3d93e..660006c 100644
--- a/system_test/replication_testsuite/replica_basic_test.py
+++ b/system_test/replication_testsuite/replica_basic_test.py
@@ -188,7 +188,7 @@ class ReplicaBasicTest(ReplicationUtils, SetupUtils):
 
                 if autoCreateTopic.lower() == "false":
                     self.log_message("creating topics")
-                    kafka_system_test_utils.create_topic(self.systemTestEnv, self.testcaseEnv)
+                    kafka_system_test_utils.create_topic_for_producer_performance(self.systemTestEnv, self.testcaseEnv)
                     self.anonLogger.info("sleeping for 5s")
                     time.sleep(5)