You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2014/03/14 23:14:48 UTC
[2/5] KAFKA-1012; Consumer offset management in Kafka;
patched by Tejas Patil and Joel Koshy;
feedback and reviews from Neha Narkhede, Jun Rao, Guozhang Wang, Sriram
Subramanian, Joe Stein, Chris Riccomini
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/mirror_maker/config/whitelisttest_2.consumer.properties
----------------------------------------------------------------------
diff --git a/system_test/mirror_maker/config/whitelisttest_2.consumer.properties b/system_test/mirror_maker/config/whitelisttest_2.consumer.properties
deleted file mode 100644
index f1a902b..0000000
--- a/system_test/mirror_maker/config/whitelisttest_2.consumer.properties
+++ /dev/null
@@ -1,28 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# see kafka.consumer.ConsumerConfig for more details
-
-# zk connection string
-# comma separated host:port pairs, each corresponding to a zk
-# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
-zk.connect=localhost:2182
-
-# timeout in ms for connecting to zookeeper
-zk.connection.timeout.ms=1000000
-
-#consumer group id
-group.id=group1
-shallow.iterator.enable=true
-
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/mirror_maker/config/zookeeper_source_1.properties
----------------------------------------------------------------------
diff --git a/system_test/mirror_maker/config/zookeeper_source_1.properties b/system_test/mirror_maker/config/zookeeper_source_1.properties
deleted file mode 100644
index f851796..0000000
--- a/system_test/mirror_maker/config/zookeeper_source_1.properties
+++ /dev/null
@@ -1,18 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# the directory where the snapshot is stored.
-dataDir=/tmp/zookeeper_source-1
-# the port at which the clients will connect
-clientPort=2181
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/mirror_maker/config/zookeeper_source_2.properties
----------------------------------------------------------------------
diff --git a/system_test/mirror_maker/config/zookeeper_source_2.properties b/system_test/mirror_maker/config/zookeeper_source_2.properties
deleted file mode 100644
index d534d18..0000000
--- a/system_test/mirror_maker/config/zookeeper_source_2.properties
+++ /dev/null
@@ -1,18 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# the directory where the snapshot is stored.
-dataDir=/tmp/zookeeper_source-2
-# the port at which the clients will connect
-clientPort=2182
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/mirror_maker/config/zookeeper_target.properties
----------------------------------------------------------------------
diff --git a/system_test/mirror_maker/config/zookeeper_target.properties b/system_test/mirror_maker/config/zookeeper_target.properties
deleted file mode 100644
index 55a7eb1..0000000
--- a/system_test/mirror_maker/config/zookeeper_target.properties
+++ /dev/null
@@ -1,18 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# the directory where the snapshot is stored.
-dataDir=/tmp/zookeeper_target
-# the port at which the clients will connect
-clientPort=2183
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/mirror_maker_testsuite/mirror_maker_test.py
----------------------------------------------------------------------
diff --git a/system_test/mirror_maker_testsuite/mirror_maker_test.py b/system_test/mirror_maker_testsuite/mirror_maker_test.py
index fd18088..c0117c6 100644
--- a/system_test/mirror_maker_testsuite/mirror_maker_test.py
+++ b/system_test/mirror_maker_testsuite/mirror_maker_test.py
@@ -166,7 +166,7 @@ class MirrorMakerTest(ReplicationUtils, SetupUtils):
time.sleep(5)
self.log_message("creating topics")
- kafka_system_test_utils.create_topic(self.systemTestEnv, self.testcaseEnv)
+ kafka_system_test_utils.create_topic_for_producer_performance(self.systemTestEnv, self.testcaseEnv)
self.anonLogger.info("sleeping for 5s")
time.sleep(5)
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/offset_management_testsuite/cluster_config.json
----------------------------------------------------------------------
diff --git a/system_test/offset_management_testsuite/cluster_config.json b/system_test/offset_management_testsuite/cluster_config.json
new file mode 100644
index 0000000..dcca200
--- /dev/null
+++ b/system_test/offset_management_testsuite/cluster_config.json
@@ -0,0 +1,103 @@
+{
+ "cluster_config": [
+ {
+ "entity_id": "0",
+ "hostname": "localhost",
+ "role": "zookeeper",
+ "cluster_name":"source",
+ "kafka_home": "default",
+ "java_home": "default",
+ "jmx_port": "9100"
+ },
+ {
+ "entity_id": "1",
+ "hostname": "localhost",
+ "role": "broker",
+ "cluster_name":"source",
+ "kafka_home": "default",
+ "java_home": "default",
+ "jmx_port": "9101"
+ },
+ {
+ "entity_id": "2",
+ "hostname": "localhost",
+ "role": "broker",
+ "cluster_name":"source",
+ "kafka_home": "default",
+ "java_home": "default",
+ "jmx_port": "9102"
+ },
+ {
+ "entity_id": "3",
+ "hostname": "localhost",
+ "role": "broker",
+ "cluster_name":"source",
+ "kafka_home": "default",
+ "java_home": "default",
+ "jmx_port": "9103"
+ },
+ {
+ "entity_id": "4",
+ "hostname": "localhost",
+ "role": "broker",
+ "cluster_name":"source",
+ "kafka_home": "default",
+ "java_home": "default",
+ "jmx_port": "9104"
+ },
+ {
+ "entity_id": "5",
+ "hostname": "localhost",
+ "role": "producer_performance",
+ "cluster_name":"source",
+ "kafka_home": "default",
+ "java_home": "default",
+ "jmx_port": "9105"
+ },
+ {
+ "entity_id": "6",
+ "hostname": "localhost",
+ "role": "console_consumer",
+ "cluster_name":"source",
+ "kafka_home": "default",
+ "java_home": "default",
+ "jmx_port": "9106"
+ },
+ {
+ "entity_id": "7",
+ "hostname": "localhost",
+ "role": "console_consumer",
+ "cluster_name":"source",
+ "kafka_home": "default",
+ "java_home": "default",
+ "jmx_port": "9107"
+ },
+ {
+ "entity_id": "8",
+ "hostname": "localhost",
+ "role": "console_consumer",
+ "cluster_name":"source",
+ "kafka_home": "default",
+ "java_home": "default",
+ "jmx_port": "9108"
+ },
+ {
+ "entity_id": "9",
+ "hostname": "localhost",
+ "role": "console_consumer",
+ "cluster_name":"source",
+ "kafka_home": "default",
+ "java_home": "default",
+ "jmx_port": "9109"
+ },
+ {
+ "entity_id": "10",
+ "hostname": "localhost",
+ "role": "console_consumer",
+ "cluster_name":"source",
+ "kafka_home": "default",
+ "java_home": "default",
+ "jmx_port": "9110"
+ }
+ ]
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/offset_management_testsuite/config/console_consumer.properties
----------------------------------------------------------------------
diff --git a/system_test/offset_management_testsuite/config/console_consumer.properties b/system_test/offset_management_testsuite/config/console_consumer.properties
new file mode 100644
index 0000000..a2ab8b9
--- /dev/null
+++ b/system_test/offset_management_testsuite/config/console_consumer.properties
@@ -0,0 +1,2 @@
+auto.offset.reset=smallest
+auto.commit.interval.ms=1000
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/offset_management_testsuite/config/producer_performance.properties
----------------------------------------------------------------------
diff --git a/system_test/offset_management_testsuite/config/producer_performance.properties b/system_test/offset_management_testsuite/config/producer_performance.properties
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/offset_management_testsuite/config/server.properties
----------------------------------------------------------------------
diff --git a/system_test/offset_management_testsuite/config/server.properties b/system_test/offset_management_testsuite/config/server.properties
new file mode 100644
index 0000000..2b988f8
--- /dev/null
+++ b/system_test/offset_management_testsuite/config/server.properties
@@ -0,0 +1,144 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+############################# Server Basics #############################
+
+# The id of the broker. This must be set to a unique integer for each broker.
+broker.id=0
+
+# Hostname the broker will advertise to consumers. If not set, kafka will use the value returned
+# from InetAddress.getLocalHost(). If there are multiple interfaces getLocalHost
+# may not be what you want.
+#host.name=
+
+
+############################# Socket Server Settings #############################
+
+# The port the socket server listens on
+port=9091
+
+# The number of threads handling network requests
+num.network.threads=2
+
+# The number of threads doing disk I/O
+num.io.threads=2
+
+# The send buffer (SO_SNDBUF) used by the socket server
+socket.send.buffer.bytes=1048576
+
+# The receive buffer (SO_RCVBUF) used by the socket server
+socket.receive.buffer.bytes=1048576
+
+# The maximum size of a request that the socket server will accept (protection against OOM)
+socket.request.max.bytes=104857600
+
+
+############################# Log Basics #############################
+
+# The directory under which to store log files
+log.dir=/tmp/kafka_server_logs
+
+# The default number of log partitions per topic. More partitions allow greater
+# parallelism for consumption, but this will also result in more files across
+# the brokers.
+num.partitions=5
+
+# Overrides for for the default given by num.partitions on a per-topic basis
+#topic.partition.count.map=topic1:3, topic2:4
+
+############################# Log Flush Policy #############################
+
+# The following configurations control the flush of data to disk. This is the most
+# important performance knob in kafka.
+# There are a few important trade-offs here:
+# 1. Durability: Unflushed data is at greater risk of loss in the event of a crash.
+# 2. Latency: Data is not made available to consumers until it is flushed (which adds latency).
+# 3. Throughput: The flush is generally the most expensive operation.
+# The settings below allow one to configure the flush policy to flush data after a period of time or
+# every N messages (or both). This can be done globally and overridden on a per-topic basis.
+
+# The number of messages to accept before forcing a flush of data to disk
+log.flush.interval.messages=10000
+
+# The maximum amount of time a message can sit in a log before we force a flush
+log.flush.interval.ms=1000
+
+# Per-topic overrides for log.flush.interval.ms
+#log.flush.intervals.ms.per.topic=topic1:1000, topic2:3000
+
+# The interval (in ms) at which logs are checked to see if they need to be flushed to disk.
+log.flush.scheduler.interval.ms=1000
+
+############################# Log Retention Policy #############################
+
+# The following configurations control the disposal of log segments. The policy can
+# be set to delete segments after a period of time, or after a given size has accumulated.
+# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
+# from the end of the log.
+
+# The minimum age of a log file to be eligible for deletion
+log.retention.hours=168
+
+# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
+# segments don't drop below log.retention.bytes.
+#log.retention.bytes=1073741824
+log.retention.bytes=-1
+
+# The maximum size of a log segment file. When this size is reached a new log segment will be created.
+#log.segment.size=536870912
+log.segment.bytes=102400
+
+# The interval at which log segments are checked to see if they can be deleted according
+# to the retention policies
+log.cleanup.interval.mins=1
+
+############################# Zookeeper #############################
+
+# Enable connecting to zookeeper
+enable.zookeeper=true
+
+# Zk connection string (see zk docs for details).
+# This is a comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
+# You can also append an optional chroot string to the urls to specify the
+# root directory for all kafka znodes.
+zookeeper.connect=localhost:2181
+
+# Timeout in ms for connecting to zookeeper
+zookeeper.connection.timeout.ms=1000000
+
+monitoring.period.secs=1
+message.max.bytes=1000000
+queued.max.requests=500
+log.roll.hours=168
+log.index.size.max.bytes=10485760
+log.index.interval.bytes=4096
+auto.create.topics.enable=true
+controller.socket.timeout.ms=30000
+controller.message.queue.size=10
+default.replication.factor=1
+replica.lag.time.max.ms=10000
+replica.lag.max.messages=4000
+replica.socket.timeout.ms=30000
+replica.socket.receive.buffer.bytes=65536
+replica.fetch.max.bytes=1048576
+replica.fetch.wait.max.ms=500
+replica.fetch.min.bytes=4096
+num.replica.fetchers=1
+
+offsets.topic.num.partitions=2
+offsets.topic.replication.factor=4
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/offset_management_testsuite/config/zookeeper.properties
----------------------------------------------------------------------
diff --git a/system_test/offset_management_testsuite/config/zookeeper.properties b/system_test/offset_management_testsuite/config/zookeeper.properties
new file mode 100644
index 0000000..5474a72
--- /dev/null
+++ b/system_test/offset_management_testsuite/config/zookeeper.properties
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# the directory where the snapshot is stored.
+dataDir=/tmp/zookeeper
+# the port at which the clients will connect
+clientPort=2181
+# disable the per-ip limit on the number of connections since this is a non-production config
+maxClientCnxns=0
+syncLimit=5
+initLimit=10
+tickTime=2000
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/offset_management_testsuite/offset_management_test.py
----------------------------------------------------------------------
diff --git a/system_test/offset_management_testsuite/offset_management_test.py b/system_test/offset_management_testsuite/offset_management_test.py
new file mode 100644
index 0000000..12b5cd2
--- /dev/null
+++ b/system_test/offset_management_testsuite/offset_management_test.py
@@ -0,0 +1,298 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#!/usr/bin/env python
+
+# ===================================
+# offset_management_test.py
+# ===================================
+
+import os
+import signal
+import sys
+import time
+import traceback
+
+from system_test_env import SystemTestEnv
+sys.path.append(SystemTestEnv.SYSTEM_TEST_UTIL_DIR)
+
+from setup_utils import SetupUtils
+from replication_utils import ReplicationUtils
+import system_test_utils
+from testcase_env import TestcaseEnv
+
+# product specific: Kafka
+import kafka_system_test_utils
+import metrics
+
+class OffsetManagementTest(ReplicationUtils, SetupUtils):
+
+ testModuleAbsPathName = os.path.realpath(__file__)
+ testSuiteAbsPathName = os.path.abspath(os.path.dirname(testModuleAbsPathName))
+
+ def __init__(self, systemTestEnv):
+
+ # SystemTestEnv - provides cluster level environment settings
+ # such as entity_id, hostname, kafka_home, java_home which
+ # are available in a list of dictionary named
+ # "clusterEntityConfigDictList"
+ self.systemTestEnv = systemTestEnv
+
+ super(OffsetManagementTest, self).__init__(self)
+
+ # dict to pass user-defined attributes to logger argument: "extra"
+ d = {'name_of_class': self.__class__.__name__}
+
+ def signal_handler(self, signal, frame):
+ self.log_message("Interrupt detected - User pressed Ctrl+c")
+
+ # perform the necessary cleanup here when user presses Ctrl+c and it may be product specific
+ self.log_message("stopping all entities - please wait ...")
+ kafka_system_test_utils.stop_all_remote_running_processes(self.systemTestEnv, self.testcaseEnv)
+ sys.exit(1)
+
+ def runTest(self):
+
+ # ======================================================================
+ # get all testcase directories under this testsuite
+ # ======================================================================
+ testCasePathNameList = system_test_utils.get_dir_paths_with_prefix(
+ self.testSuiteAbsPathName, SystemTestEnv.SYSTEM_TEST_CASE_PREFIX)
+ testCasePathNameList.sort()
+
+ replicationUtils = ReplicationUtils(self)
+
+ # =============================================================
+ # launch each testcase one by one: testcase_1, testcase_2, ...
+ # =============================================================
+ for testCasePathName in testCasePathNameList:
+
+ skipThisTestCase = False
+
+ try:
+ # ======================================================================
+ # A new instance of TestcaseEnv to keep track of this testcase's env vars
+ # and initialize some env vars as testCasePathName is available now
+ # ======================================================================
+ self.testcaseEnv = TestcaseEnv(self.systemTestEnv, self)
+ self.testcaseEnv.testSuiteBaseDir = self.testSuiteAbsPathName
+ self.testcaseEnv.initWithKnownTestCasePathName(testCasePathName)
+ self.testcaseEnv.testcaseArgumentsDict = self.testcaseEnv.testcaseNonEntityDataDict["testcase_args"]
+
+ # ======================================================================
+ # SKIP if this case is IN testcase_to_skip.json or NOT IN testcase_to_run.json
+ # ======================================================================
+ testcaseDirName = self.testcaseEnv.testcaseResultsDict["_test_case_name"]
+
+ if self.systemTestEnv.printTestDescriptionsOnly:
+ self.testcaseEnv.printTestCaseDescription(testcaseDirName)
+ continue
+ elif self.systemTestEnv.isTestCaseToSkip(self.__class__.__name__, testcaseDirName):
+ self.log_message("Skipping : " + testcaseDirName)
+ skipThisTestCase = True
+ continue
+ else:
+ self.testcaseEnv.printTestCaseDescription(testcaseDirName)
+ system_test_utils.setup_remote_hosts_with_testcase_level_cluster_config(self.systemTestEnv, testCasePathName)
+
+ # ============================================================================== #
+ # ============================================================================== #
+ # Product Specific Testing Code Starts Here: #
+ # ============================================================================== #
+ # ============================================================================== #
+
+ # initialize self.testcaseEnv with user-defined environment variables (product specific)
+ self.testcaseEnv.userDefinedEnvVarDict["stopBackgroundProducer"] = False
+ self.testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"] = False
+
+ # initialize signal handler
+ signal.signal(signal.SIGINT, self.signal_handler)
+
+ # TestcaseEnv.testcaseConfigsList initialized by reading testcase properties file:
+ # system_test/<suite_name>_testsuite/testcase_<n>/testcase_<n>_properties.json
+ self.testcaseEnv.testcaseConfigsList = system_test_utils.get_json_list_data(
+ self.testcaseEnv.testcasePropJsonPathName)
+
+ # clean up data directories specified in zookeeper.properties and kafka_server_<n>.properties
+ kafka_system_test_utils.cleanup_data_at_remote_hosts(self.systemTestEnv, self.testcaseEnv)
+
+ # create "LOCAL" log directories for metrics, dashboards for each entity under this testcase
+ # for collecting logs from remote machines
+ kafka_system_test_utils.generate_testcase_log_dirs(self.systemTestEnv, self.testcaseEnv)
+
+ # TestcaseEnv - initialize producer & consumer config / log file pathnames
+ kafka_system_test_utils.init_entity_props(self.systemTestEnv, self.testcaseEnv)
+
+ # generate remote hosts log/config dirs if not exist
+ kafka_system_test_utils.generate_testcase_log_dirs_in_remote_hosts(self.systemTestEnv, self.testcaseEnv)
+
+ # generate properties files for zookeeper, kafka, producer, and consumer:
+ # 1. copy system_test/<suite_name>_testsuite/config/*.properties to
+ # system_test/<suite_name>_testsuite/testcase_<n>/config/
+ # 2. update all properties files in system_test/<suite_name>_testsuite/testcase_<n>/config
+ # by overriding the settings specified in:
+ # system_test/<suite_name>_testsuite/testcase_<n>/testcase_<n>_properties.json
+ kafka_system_test_utils.generate_overriden_props_files(self.testSuiteAbsPathName,
+ self.testcaseEnv, self.systemTestEnv)
+
+ # =============================================
+ # preparing all entities to start the test
+ # =============================================
+ self.log_message("starting zookeepers")
+ kafka_system_test_utils.start_zookeepers(self.systemTestEnv, self.testcaseEnv)
+ self.anonLogger.info("sleeping for 2s")
+ time.sleep(2)
+
+ self.log_message("starting brokers")
+ kafka_system_test_utils.start_brokers(self.systemTestEnv, self.testcaseEnv)
+ self.anonLogger.info("sleeping for 5s")
+ time.sleep(5)
+
+ self.log_message("creating offset topic")
+ kafka_system_test_utils.create_topic(self.systemTestEnv, self.testcaseEnv, "__consumer_offsets", 3, 2)
+ self.anonLogger.info("sleeping for 5s")
+ time.sleep(5)
+
+ # =============================================
+ # starting producer
+ # =============================================
+ self.log_message("starting producer in the background")
+ kafka_system_test_utils.start_producer_performance(self.systemTestEnv, self.testcaseEnv, False)
+ msgProducingFreeTimeSec = self.testcaseEnv.testcaseArgumentsDict["message_producing_free_time_sec"]
+ self.anonLogger.info("sleeping for " + msgProducingFreeTimeSec + " sec to produce some messages")
+ time.sleep(int(msgProducingFreeTimeSec))
+
+ kafka_system_test_utils.start_console_consumers(self.systemTestEnv, self.testcaseEnv)
+
+ kafka_system_test_utils.get_leader_for(self.systemTestEnv, self.testcaseEnv, "__consumer_offsets", 0)
+
+ # =============================================
+ # A while-loop to bounce consumers as specified
+ # by "num_iterations" in testcase_n_properties.json
+ # =============================================
+ i = 1
+ numIterations = int(self.testcaseEnv.testcaseArgumentsDict["num_iteration"])
+ bouncedEntityDownTimeSec = 10
+ try:
+ bouncedEntityDownTimeSec = int(self.testcaseEnv.testcaseArgumentsDict["bounced_entity_downtime_sec"])
+ except:
+ pass
+
+ # group1 -> offsets partition 0 // has one consumer; eid: 6
+ # group2 -> offsets partition 1 // has four consumers; eid: 7, 8, 9, 10
+
+ offsets_0_leader_entity = kafka_system_test_utils.get_leader_for(self.systemTestEnv, self.testcaseEnv, "__consumer_offsets", 0)
+ offsets_1_leader_entity = kafka_system_test_utils.get_leader_for(self.systemTestEnv, self.testcaseEnv, "__consumer_offsets", 1)
+
+ while i <= numIterations:
+
+ self.log_message("Iteration " + str(i) + " of " + str(numIterations))
+ kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, offsets_0_leader_entity, self.testcaseEnv.entityBrokerParentPidDict[offsets_0_leader_entity])
+ kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, offsets_1_leader_entity, self.testcaseEnv.entityBrokerParentPidDict[offsets_1_leader_entity])
+
+ # =============================================
+ # Bounce consumers if specified in testcase config
+ # =============================================
+ bounceConsumers = self.testcaseEnv.testcaseArgumentsDict["bounce_consumers"]
+ self.log_message("bounce_consumers flag : " + bounceConsumers)
+ if (bounceConsumers.lower() == "true"):
+
+ clusterConfigList = self.systemTestEnv.clusterEntityConfigDictList
+ consumerEntityIdList = system_test_utils.get_data_from_list_of_dicts( clusterConfigList, "role", "console_consumer", "entity_id")
+
+ for stoppedConsumerEntityId in consumerEntityIdList:
+ consumerPPID = self.testcaseEnv.entityConsoleConsumerParentPidDict[stoppedConsumerEntityId]
+ self.log_message("stopping consumer: " + consumerPPID)
+ kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, stoppedConsumerEntityId, consumerPPID)
+
+ self.anonLogger.info("sleeping for " + str(bouncedEntityDownTimeSec) + " sec")
+ time.sleep(bouncedEntityDownTimeSec)
+ # leaders would have changed during the above bounce.
+ self.log_message("starting the previously terminated consumers.")
+ for stoppedConsumerEntityId in consumerEntityIdList:
+ # starting previously terminated consumer
+ kafka_system_test_utils.start_console_consumers(self.systemTestEnv, self.testcaseEnv, stoppedConsumerEntityId)
+
+ self.log_message("starting the previously terminated brokers")
+ kafka_system_test_utils.start_entity_in_background(self.systemTestEnv, self.testcaseEnv, offsets_0_leader_entity)
+ kafka_system_test_utils.start_entity_in_background(self.systemTestEnv, self.testcaseEnv, offsets_1_leader_entity)
+
+ self.anonLogger.info("sleeping for 15s")
+ time.sleep(15)
+ i += 1
+ # while loop
+
+ # =============================================
+ # tell producer to stop
+ # =============================================
+ self.testcaseEnv.lock.acquire()
+ self.testcaseEnv.userDefinedEnvVarDict["stopBackgroundProducer"] = True
+ time.sleep(1)
+ self.testcaseEnv.lock.release()
+ time.sleep(1)
+
+ # =============================================
+ # wait for producer thread's update of
+ # "backgroundProducerStopped" to be "True"
+ # =============================================
+ while 1:
+ self.testcaseEnv.lock.acquire()
+ self.logger.info("status of backgroundProducerStopped : [" + \
+ str(self.testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"]) + "]", extra=self.d)
+ if self.testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"]:
+ time.sleep(1)
+ self.logger.info("all producer threads completed", extra=self.d)
+ break
+ time.sleep(1)
+ self.testcaseEnv.lock.release()
+ time.sleep(2)
+
+ self.anonLogger.info("sleeping for 15s")
+ time.sleep(15)
+
+ # =============================================
+ # this testcase is completed - stop all entities
+ # =============================================
+ self.log_message("stopping all entities")
+ for entityId, parentPid in self.testcaseEnv.entityBrokerParentPidDict.items():
+ kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, entityId, parentPid)
+
+ for entityId, parentPid in self.testcaseEnv.entityZkParentPidDict.items():
+ kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, entityId, parentPid)
+
+ # make sure all entities are stopped
+ kafka_system_test_utils.ps_grep_terminate_running_entity(self.systemTestEnv)
+
+ # =============================================
+ # collect logs from remote hosts
+ # =============================================
+ kafka_system_test_utils.collect_logs_from_remote_hosts(self.systemTestEnv, self.testcaseEnv)
+
+ # =============================================
+ # validate the data matched and checksum
+ # =============================================
+ self.log_message("validating data matched")
+ kafka_system_test_utils.validate_data_matched_in_multi_topics_from_single_consumer_producer(self.systemTestEnv, self.testcaseEnv, replicationUtils)
+
+ except Exception as e:
+ self.log_message("Exception while running test {0}".format(e))
+ traceback.print_exc()
+
+ finally:
+ if not skipThisTestCase and not self.systemTestEnv.printTestDescriptionsOnly:
+ self.log_message("stopping all entities - please wait ...")
+ kafka_system_test_utils.stop_all_remote_running_processes(self.systemTestEnv, self.testcaseEnv)
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/offset_management_testsuite/testcase_7001/testcase_7001_properties.json
----------------------------------------------------------------------
diff --git a/system_test/offset_management_testsuite/testcase_7001/testcase_7001_properties.json b/system_test/offset_management_testsuite/testcase_7001/testcase_7001_properties.json
new file mode 100644
index 0000000..02af3e8
--- /dev/null
+++ b/system_test/offset_management_testsuite/testcase_7001/testcase_7001_properties.json
@@ -0,0 +1,95 @@
+{
+ "description": {"01":"To Test : 'Basic offset management test.'",
+ "02":"Set up a Zk and Kafka cluster.",
+ "03":"Produce messages to a multiple topics - various partition counts.",
+ "04":"Start multiple consumer groups to read various subsets of above topics.",
+ "05":"Bounce consumers.",
+ "06":"Verify that there are no duplicate messages or lost messages on any consumer group.",
+ "07":"Producer dimensions : mode:sync, acks:-1, comp:0"
+ },
+ "testcase_args": {
+ "bounce_leaders": "false",
+ "bounce_consumers": "true",
+ "replica_factor": "3",
+ "num_partition": "1",
+ "num_iteration": "1",
+ "sleep_seconds_between_producer_calls": "1",
+ "message_producing_free_time_sec": "15",
+ "num_messages_to_produce_per_producer_call": "50",
+ "num_topics_for_auto_generated_string":"1"
+ },
+ "entities": [
+ {
+ "entity_id": "0",
+ "clientPort": "2108",
+ "dataDir": "/tmp/zookeeper_0",
+ "log_filename": "zookeeper_0.log",
+ "config_filename": "zookeeper_0.properties"
+ },
+ {
+ "entity_id": "1",
+ "port": "9091",
+ "broker.id": "1",
+ "log.segment.bytes": "10240",
+ "log.dir": "/tmp/kafka_server_1_logs",
+ "default.replication.factor": "3",
+ "num.partitions": "5",
+ "log_filename": "kafka_server_1.log",
+ "config_filename": "kafka_server_1.properties"
+ },
+ {
+ "entity_id": "2",
+ "port": "9092",
+ "broker.id": "2",
+ "log.segment.bytes": "10240",
+ "log.dir": "/tmp/kafka_server_2_logs",
+ "default.replication.factor": "3",
+ "num.partitions": "5",
+ "log_filename": "kafka_server_2.log",
+ "config_filename": "kafka_server_2.properties"
+ },
+ {
+ "entity_id": "3",
+ "port": "9093",
+ "broker.id": "3",
+ "log.segment.bytes": "10240",
+ "log.dir": "/tmp/kafka_server_3_logs",
+ "default.replication.factor": "3",
+ "num.partitions": "5",
+ "log_filename": "kafka_server_3.log",
+ "config_filename": "kafka_server_3.properties"
+ },
+ {
+ "entity_id": "4",
+ "port": "9094",
+ "broker.id": "4",
+ "log.segment.bytes": "10240",
+ "log.dir": "/tmp/kafka_server_4_logs",
+ "default.replication.factor": "3",
+ "num.partitions": "5",
+ "log_filename": "kafka_server_4.log",
+ "config_filename": "kafka_server_4.properties"
+ },
+ {
+ "entity_id": "5",
+ "topic": "test",
+ "threads": "3",
+ "compression-codec": "0",
+ "message-size": "500",
+ "message": "1000",
+ "request-num-acks": "-1",
+ "sync":"true",
+ "producer-num-retries":"5",
+ "log_filename": "producer_performance_10.log",
+ "config_filename": "producer_performance_10.properties"
+ },
+ {
+ "entity_id": "6",
+ "topic": "test_0001",
+ "group.id": "group1",
+ "consumer-timeout-ms": "30000",
+ "log_filename": "console_consumer.log",
+ "config_filename": "console_consumer_6.properties"
+ }
+ ]
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/offset_management_testsuite/testcase_7002/config/kafka_server_1.properties
----------------------------------------------------------------------
diff --git a/system_test/offset_management_testsuite/testcase_7002/config/kafka_server_1.properties b/system_test/offset_management_testsuite/testcase_7002/config/kafka_server_1.properties
new file mode 100644
index 0000000..41ec6e4
--- /dev/null
+++ b/system_test/offset_management_testsuite/testcase_7002/config/kafka_server_1.properties
@@ -0,0 +1,148 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+############################# Server Basics #############################
+
+# The id of the broker. This must be set to a unique integer for each broker.
+broker.id=1
+
+# Hostname the broker will advertise to consumers. If not set, kafka will use the value returned
+# from InetAddress.getLocalHost(). If there are multiple interfaces getLocalHost
+# may not be what you want.
+#host.name=
+
+
+############################# Socket Server Settings #############################
+
+# The port the socket server listens on
+port=9091
+
+# The number of threads handling network requests
+num.network.threads=2
+
+# The number of threads doing disk I/O
+num.io.threads=2
+
+# The send buffer (SO_SNDBUF) used by the socket server
+socket.send.buffer.bytes=1048576
+
+# The receive buffer (SO_RCVBUF) used by the socket server
+socket.receive.buffer.bytes=1048576
+
+# The maximum size of a request that the socket server will accept (protection against OOM)
+socket.request.max.bytes=104857600
+
+
+############################# Log Basics #############################
+
+# The directory under which to store log files
+log.dir=/tmp/kafka_server_1_logs
+
+# The default number of log partitions per topic. More partitions allow greater
+# parallelism for consumption, but this will also result in more files across
+# the brokers.
+num.partitions=5
+
+# Overrides for for the default given by num.partitions on a per-topic basis
+#topic.partition.count.map=topic1:3, topic2:4
+
+############################# Log Flush Policy #############################
+
+# The following configurations control the flush of data to disk. This is the most
+# important performance knob in kafka.
+# There are a few important trade-offs here:
+# 1. Durability: Unflushed data is at greater risk of loss in the event of a crash.
+# 2. Latency: Data is not made available to consumers until it is flushed (which adds latency).
+# 3. Throughput: The flush is generally the most expensive operation.
+# The settings below allow one to configure the flush policy to flush data after a period of time or
+# every N messages (or both). This can be done globally and overridden on a per-topic basis.
+
+# The number of messages to accept before forcing a flush of data to disk
+log.flush.interval.messages=10000
+
+# The maximum amount of time a message can sit in a log before we force a flush
+log.flush.interval.ms=1000
+
+# Per-topic overrides for log.flush.interval.ms
+#log.flush.intervals.ms.per.topic=topic1:1000, topic2:3000
+
+# The interval (in ms) at which logs are checked to see if they need to be flushed to disk.
+log.flush.scheduler.interval.ms=1000
+
+############################# Log Retention Policy #############################
+
+# The following configurations control the disposal of log segments. The policy can
+# be set to delete segments after a period of time, or after a given size has accumulated.
+# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
+# from the end of the log.
+
+# The minimum age of a log file to be eligible for deletion
+log.retention.hours=168
+
+# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
+# segments don't drop below log.retention.bytes.
+#log.retention.bytes=1073741824
+log.retention.bytes=-1
+
+# The maximum size of a log segment file. When this size is reached a new log segment will be created.
+#log.segment.size=536870912
+log.segment.bytes=10240
+
+# The interval at which log segments are checked to see if they can be deleted according
+# to the retention policies
+log.cleanup.interval.mins=1
+
+############################# Zookeeper #############################
+
+# Enable connecting to zookeeper
+enable.zookeeper=true
+
+# Zk connection string (see zk docs for details).
+# This is a comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
+# You can also append an optional chroot string to the urls to specify the
+# root directory for all kafka znodes.
+zookeeper.connect=localhost:2108
+
+# Timeout in ms for connecting to zookeeper
+zookeeper.connection.timeout.ms=1000000
+
+monitoring.period.secs=1
+message.max.bytes=1000000
+queued.max.requests=500
+log.roll.hours=168
+log.index.size.max.bytes=10485760
+log.index.interval.bytes=4096
+auto.create.topics.enable=true
+controller.socket.timeout.ms=30000
+controller.message.queue.size=10
+default.replication.factor=3
+replica.lag.time.max.ms=10000
+replica.lag.max.messages=4000
+replica.socket.timeout.ms=30000
+replica.socket.receive.buffer.bytes=65536
+replica.fetch.max.bytes=1048576
+replica.fetch.wait.max.ms=500
+replica.fetch.min.bytes=4096
+num.replica.fetchers=1
+
+offsets.topic.num.partitions=2
+offsets.topic.replication.factor=4
+
+kafka.csv.metrics.dir=/home/jkoshy/Projects/kafka/system_test/offset_management_testsuite/testcase_7002/logs/broker-1/metrics
+kafka.csv.metrics.reporter.enabled=true
+kafka.metrics.polling.interval.secs=5
+kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/offset_management_testsuite/testcase_7002/config/kafka_server_2.properties
----------------------------------------------------------------------
diff --git a/system_test/offset_management_testsuite/testcase_7002/config/kafka_server_2.properties b/system_test/offset_management_testsuite/testcase_7002/config/kafka_server_2.properties
new file mode 100644
index 0000000..727e237
--- /dev/null
+++ b/system_test/offset_management_testsuite/testcase_7002/config/kafka_server_2.properties
@@ -0,0 +1,148 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+############################# Server Basics #############################
+
+# The id of the broker. This must be set to a unique integer for each broker.
+broker.id=2
+
+# Hostname the broker will advertise to consumers. If not set, kafka will use the value returned
+# from InetAddress.getLocalHost(). If there are multiple interfaces getLocalHost
+# may not be what you want.
+#host.name=
+
+
+############################# Socket Server Settings #############################
+
+# The port the socket server listens on
+port=9092
+
+# The number of threads handling network requests
+num.network.threads=2
+
+# The number of threads doing disk I/O
+num.io.threads=2
+
+# The send buffer (SO_SNDBUF) used by the socket server
+socket.send.buffer.bytes=1048576
+
+# The receive buffer (SO_RCVBUF) used by the socket server
+socket.receive.buffer.bytes=1048576
+
+# The maximum size of a request that the socket server will accept (protection against OOM)
+socket.request.max.bytes=104857600
+
+
+############################# Log Basics #############################
+
+# The directory under which to store log files
+log.dir=/tmp/kafka_server_2_logs
+
+# The default number of log partitions per topic. More partitions allow greater
+# parallelism for consumption, but this will also result in more files across
+# the brokers.
+num.partitions=5
+
+# Overrides for for the default given by num.partitions on a per-topic basis
+#topic.partition.count.map=topic1:3, topic2:4
+
+############################# Log Flush Policy #############################
+
+# The following configurations control the flush of data to disk. This is the most
+# important performance knob in kafka.
+# There are a few important trade-offs here:
+# 1. Durability: Unflushed data is at greater risk of loss in the event of a crash.
+# 2. Latency: Data is not made available to consumers until it is flushed (which adds latency).
+# 3. Throughput: The flush is generally the most expensive operation.
+# The settings below allow one to configure the flush policy to flush data after a period of time or
+# every N messages (or both). This can be done globally and overridden on a per-topic basis.
+
+# The number of messages to accept before forcing a flush of data to disk
+log.flush.interval.messages=10000
+
+# The maximum amount of time a message can sit in a log before we force a flush
+log.flush.interval.ms=1000
+
+# Per-topic overrides for log.flush.interval.ms
+#log.flush.intervals.ms.per.topic=topic1:1000, topic2:3000
+
+# The interval (in ms) at which logs are checked to see if they need to be flushed to disk.
+log.flush.scheduler.interval.ms=1000
+
+############################# Log Retention Policy #############################
+
+# The following configurations control the disposal of log segments. The policy can
+# be set to delete segments after a period of time, or after a given size has accumulated.
+# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
+# from the end of the log.
+
+# The minimum age of a log file to be eligible for deletion
+log.retention.hours=168
+
+# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
+# segments don't drop below log.retention.bytes.
+#log.retention.bytes=1073741824
+log.retention.bytes=-1
+
+# The maximum size of a log segment file. When this size is reached a new log segment will be created.
+#log.segment.size=536870912
+log.segment.bytes=10240
+
+# The interval at which log segments are checked to see if they can be deleted according
+# to the retention policies
+log.cleanup.interval.mins=1
+
+############################# Zookeeper #############################
+
+# Enable connecting to zookeeper
+enable.zookeeper=true
+
+# Zk connection string (see zk docs for details).
+# This is a comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
+# You can also append an optional chroot string to the urls to specify the
+# root directory for all kafka znodes.
+zookeeper.connect=localhost:2108
+
+# Timeout in ms for connecting to zookeeper
+zookeeper.connection.timeout.ms=1000000
+
+monitoring.period.secs=1
+message.max.bytes=1000000
+queued.max.requests=500
+log.roll.hours=168
+log.index.size.max.bytes=10485760
+log.index.interval.bytes=4096
+auto.create.topics.enable=true
+controller.socket.timeout.ms=30000
+controller.message.queue.size=10
+default.replication.factor=3
+replica.lag.time.max.ms=10000
+replica.lag.max.messages=4000
+replica.socket.timeout.ms=30000
+replica.socket.receive.buffer.bytes=65536
+replica.fetch.max.bytes=1048576
+replica.fetch.wait.max.ms=500
+replica.fetch.min.bytes=4096
+num.replica.fetchers=1
+
+offsets.topic.num.partitions=2
+offsets.topic.replication.factor=4
+
+kafka.csv.metrics.dir=/home/jkoshy/Projects/kafka/system_test/offset_management_testsuite/testcase_7002/logs/broker-2/metrics
+kafka.csv.metrics.reporter.enabled=true
+kafka.metrics.polling.interval.secs=5
+kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/offset_management_testsuite/testcase_7002/config/kafka_server_3.properties
----------------------------------------------------------------------
diff --git a/system_test/offset_management_testsuite/testcase_7002/config/kafka_server_3.properties b/system_test/offset_management_testsuite/testcase_7002/config/kafka_server_3.properties
new file mode 100644
index 0000000..e6fbbe1
--- /dev/null
+++ b/system_test/offset_management_testsuite/testcase_7002/config/kafka_server_3.properties
@@ -0,0 +1,148 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+############################# Server Basics #############################
+
+# The id of the broker. This must be set to a unique integer for each broker.
+broker.id=3
+
+# Hostname the broker will advertise to consumers. If not set, kafka will use the value returned
+# from InetAddress.getLocalHost(). If there are multiple interfaces getLocalHost
+# may not be what you want.
+#host.name=
+
+
+############################# Socket Server Settings #############################
+
+# The port the socket server listens on
+port=9093
+
+# The number of threads handling network requests
+num.network.threads=2
+
+# The number of threads doing disk I/O
+num.io.threads=2
+
+# The send buffer (SO_SNDBUF) used by the socket server
+socket.send.buffer.bytes=1048576
+
+# The receive buffer (SO_RCVBUF) used by the socket server
+socket.receive.buffer.bytes=1048576
+
+# The maximum size of a request that the socket server will accept (protection against OOM)
+socket.request.max.bytes=104857600
+
+
+############################# Log Basics #############################
+
+# The directory under which to store log files
+log.dir=/tmp/kafka_server_3_logs
+
+# The default number of log partitions per topic. More partitions allow greater
+# parallelism for consumption, but this will also result in more files across
+# the brokers.
+num.partitions=5
+
+# Overrides for for the default given by num.partitions on a per-topic basis
+#topic.partition.count.map=topic1:3, topic2:4
+
+############################# Log Flush Policy #############################
+
+# The following configurations control the flush of data to disk. This is the most
+# important performance knob in kafka.
+# There are a few important trade-offs here:
+# 1. Durability: Unflushed data is at greater risk of loss in the event of a crash.
+# 2. Latency: Data is not made available to consumers until it is flushed (which adds latency).
+# 3. Throughput: The flush is generally the most expensive operation.
+# The settings below allow one to configure the flush policy to flush data after a period of time or
+# every N messages (or both). This can be done globally and overridden on a per-topic basis.
+
+# The number of messages to accept before forcing a flush of data to disk
+log.flush.interval.messages=10000
+
+# The maximum amount of time a message can sit in a log before we force a flush
+log.flush.interval.ms=1000
+
+# Per-topic overrides for log.flush.interval.ms
+#log.flush.intervals.ms.per.topic=topic1:1000, topic2:3000
+
+# The interval (in ms) at which logs are checked to see if they need to be flushed to disk.
+log.flush.scheduler.interval.ms=1000
+
+############################# Log Retention Policy #############################
+
+# The following configurations control the disposal of log segments. The policy can
+# be set to delete segments after a period of time, or after a given size has accumulated.
+# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
+# from the end of the log.
+
+# The minimum age of a log file to be eligible for deletion
+log.retention.hours=168
+
+# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
+# segments don't drop below log.retention.bytes.
+#log.retention.bytes=1073741824
+log.retention.bytes=-1
+
+# The maximum size of a log segment file. When this size is reached a new log segment will be created.
+#log.segment.size=536870912
+log.segment.bytes=10240
+
+# The interval at which log segments are checked to see if they can be deleted according
+# to the retention policies
+log.cleanup.interval.mins=1
+
+############################# Zookeeper #############################
+
+# Enable connecting to zookeeper
+enable.zookeeper=true
+
+# Zk connection string (see zk docs for details).
+# This is a comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
+# You can also append an optional chroot string to the urls to specify the
+# root directory for all kafka znodes.
+zookeeper.connect=localhost:2108
+
+# Timeout in ms for connecting to zookeeper
+zookeeper.connection.timeout.ms=1000000
+
+monitoring.period.secs=1
+message.max.bytes=1000000
+queued.max.requests=500
+log.roll.hours=168
+log.index.size.max.bytes=10485760
+log.index.interval.bytes=4096
+auto.create.topics.enable=true
+controller.socket.timeout.ms=30000
+controller.message.queue.size=10
+default.replication.factor=3
+replica.lag.time.max.ms=10000
+replica.lag.max.messages=4000
+replica.socket.timeout.ms=30000
+replica.socket.receive.buffer.bytes=65536
+replica.fetch.max.bytes=1048576
+replica.fetch.wait.max.ms=500
+replica.fetch.min.bytes=4096
+num.replica.fetchers=1
+
+offsets.topic.num.partitions=2
+offsets.topic.replication.factor=4
+
+kafka.csv.metrics.dir=/home/jkoshy/Projects/kafka/system_test/offset_management_testsuite/testcase_7002/logs/broker-3/metrics
+kafka.csv.metrics.reporter.enabled=true
+kafka.metrics.polling.interval.secs=5
+kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/offset_management_testsuite/testcase_7002/config/kafka_server_4.properties
----------------------------------------------------------------------
diff --git a/system_test/offset_management_testsuite/testcase_7002/config/kafka_server_4.properties b/system_test/offset_management_testsuite/testcase_7002/config/kafka_server_4.properties
new file mode 100644
index 0000000..fee65bc
--- /dev/null
+++ b/system_test/offset_management_testsuite/testcase_7002/config/kafka_server_4.properties
@@ -0,0 +1,148 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+############################# Server Basics #############################
+
+# The id of the broker. This must be set to a unique integer for each broker.
+broker.id=4
+
+# Hostname the broker will advertise to consumers. If not set, kafka will use the value returned
+# from InetAddress.getLocalHost(). If there are multiple interfaces getLocalHost
+# may not be what you want.
+#host.name=
+
+
+############################# Socket Server Settings #############################
+
+# The port the socket server listens on
+port=9094
+
+# The number of threads handling network requests
+num.network.threads=2
+
+# The number of threads doing disk I/O
+num.io.threads=2
+
+# The send buffer (SO_SNDBUF) used by the socket server
+socket.send.buffer.bytes=1048576
+
+# The receive buffer (SO_RCVBUF) used by the socket server
+socket.receive.buffer.bytes=1048576
+
+# The maximum size of a request that the socket server will accept (protection against OOM)
+socket.request.max.bytes=104857600
+
+
+############################# Log Basics #############################
+
+# The directory under which to store log files
+log.dir=/tmp/kafka_server_4_logs
+
+# The default number of log partitions per topic. More partitions allow greater
+# parallelism for consumption, but this will also result in more files across
+# the brokers.
+num.partitions=5
+
+# Overrides for for the default given by num.partitions on a per-topic basis
+#topic.partition.count.map=topic1:3, topic2:4
+
+############################# Log Flush Policy #############################
+
+# The following configurations control the flush of data to disk. This is the most
+# important performance knob in kafka.
+# There are a few important trade-offs here:
+# 1. Durability: Unflushed data is at greater risk of loss in the event of a crash.
+# 2. Latency: Data is not made available to consumers until it is flushed (which adds latency).
+# 3. Throughput: The flush is generally the most expensive operation.
+# The settings below allow one to configure the flush policy to flush data after a period of time or
+# every N messages (or both). This can be done globally and overridden on a per-topic basis.
+
+# The number of messages to accept before forcing a flush of data to disk
+log.flush.interval.messages=10000
+
+# The maximum amount of time a message can sit in a log before we force a flush
+log.flush.interval.ms=1000
+
+# Per-topic overrides for log.flush.interval.ms
+#log.flush.intervals.ms.per.topic=topic1:1000, topic2:3000
+
+# The interval (in ms) at which logs are checked to see if they need to be flushed to disk.
+log.flush.scheduler.interval.ms=1000
+
+############################# Log Retention Policy #############################
+
+# The following configurations control the disposal of log segments. The policy can
+# be set to delete segments after a period of time, or after a given size has accumulated.
+# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
+# from the end of the log.
+
+# The minimum age of a log file to be eligible for deletion
+log.retention.hours=168
+
+# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
+# segments don't drop below log.retention.bytes.
+#log.retention.bytes=1073741824
+log.retention.bytes=-1
+
+# The maximum size of a log segment file. When this size is reached a new log segment will be created.
+#log.segment.size=536870912
+log.segment.bytes=10240
+
+# The interval at which log segments are checked to see if they can be deleted according
+# to the retention policies
+log.cleanup.interval.mins=1
+
+############################# Zookeeper #############################
+
+# Enable connecting to zookeeper
+enable.zookeeper=true
+
+# Zk connection string (see zk docs for details).
+# This is a comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
+# You can also append an optional chroot string to the urls to specify the
+# root directory for all kafka znodes.
+zookeeper.connect=localhost:2108
+
+# Timeout in ms for connecting to zookeeper
+zookeeper.connection.timeout.ms=1000000
+
+monitoring.period.secs=1
+message.max.bytes=1000000
+queued.max.requests=500
+log.roll.hours=168
+log.index.size.max.bytes=10485760
+log.index.interval.bytes=4096
+auto.create.topics.enable=true
+controller.socket.timeout.ms=30000
+controller.message.queue.size=10
+default.replication.factor=3
+replica.lag.time.max.ms=10000
+replica.lag.max.messages=4000
+replica.socket.timeout.ms=30000
+replica.socket.receive.buffer.bytes=65536
+replica.fetch.max.bytes=1048576
+replica.fetch.wait.max.ms=500
+replica.fetch.min.bytes=4096
+num.replica.fetchers=1
+
+offsets.topic.num.partitions=2
+offsets.topic.replication.factor=4
+
+kafka.csv.metrics.dir=/home/jkoshy/Projects/kafka/system_test/offset_management_testsuite/testcase_7002/logs/broker-4/metrics
+kafka.csv.metrics.reporter.enabled=true
+kafka.metrics.polling.interval.secs=5
+kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/offset_management_testsuite/testcase_7002/config/zookeeper_0.properties
----------------------------------------------------------------------
diff --git a/system_test/offset_management_testsuite/testcase_7002/config/zookeeper_0.properties b/system_test/offset_management_testsuite/testcase_7002/config/zookeeper_0.properties
new file mode 100644
index 0000000..97c07b9
--- /dev/null
+++ b/system_test/offset_management_testsuite/testcase_7002/config/zookeeper_0.properties
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# the directory where the snapshot is stored.
+dataDir=/tmp/zookeeper_0
+# the port at which the clients will connect
+clientPort=2108
+# disable the per-ip limit on the number of connections since this is a non-production config
+maxClientCnxns=0
+syncLimit=5
+initLimit=10
+tickTime=2000
+server.1=localhost:2107:2109
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/offset_management_testsuite/testcase_7002/testcase_7002_properties.json
----------------------------------------------------------------------
diff --git a/system_test/offset_management_testsuite/testcase_7002/testcase_7002_properties.json b/system_test/offset_management_testsuite/testcase_7002/testcase_7002_properties.json
new file mode 100644
index 0000000..fdab69b
--- /dev/null
+++ b/system_test/offset_management_testsuite/testcase_7002/testcase_7002_properties.json
@@ -0,0 +1,127 @@
+{
+ "description": {"01":"To Test : 'Basic offset management test.'",
+ "02":"Set up a Zk and Kafka cluster.",
+ "03":"Produce messages to a multiple topics - various partition counts.",
+ "04":"Start multiple consumer groups to read various subsets of above topics.",
+ "05":"Bounce consumers.",
+ "06":"Verify that there are no duplicate messages or lost messages on any consumer group.",
+ "07":"Producer dimensions : mode:sync, acks:-1, comp:0"
+ },
+ "testcase_args": {
+ "bounce_leaders": "false",
+ "bounce_consumers": "true",
+ "replica_factor": "3",
+ "num_partition": "1",
+ "num_iteration": "1",
+ "sleep_seconds_between_producer_calls": "1",
+ "message_producing_free_time_sec": "15",
+ "num_messages_to_produce_per_producer_call": "50",
+ "num_topics_for_auto_generated_string":"3"
+ },
+ "entities": [
+ {
+ "entity_id": "0",
+ "clientPort": "2108",
+ "dataDir": "/tmp/zookeeper_0",
+ "log_filename": "zookeeper_0.log",
+ "config_filename": "zookeeper_0.properties"
+ },
+ {
+ "entity_id": "1",
+ "port": "9091",
+ "broker.id": "1",
+ "log.segment.bytes": "10240",
+ "log.dir": "/tmp/kafka_server_1_logs",
+ "default.replication.factor": "3",
+ "num.partitions": "5",
+ "log_filename": "kafka_server_1.log",
+ "config_filename": "kafka_server_1.properties"
+ },
+ {
+ "entity_id": "2",
+ "port": "9092",
+ "broker.id": "2",
+ "log.segment.bytes": "10240",
+ "log.dir": "/tmp/kafka_server_2_logs",
+ "default.replication.factor": "3",
+ "num.partitions": "5",
+ "log_filename": "kafka_server_2.log",
+ "config_filename": "kafka_server_2.properties"
+ },
+ {
+ "entity_id": "3",
+ "port": "9093",
+ "broker.id": "3",
+ "log.segment.bytes": "10240",
+ "log.dir": "/tmp/kafka_server_3_logs",
+ "default.replication.factor": "3",
+ "num.partitions": "5",
+ "log_filename": "kafka_server_3.log",
+ "config_filename": "kafka_server_3.properties"
+ },
+ {
+ "entity_id": "4",
+ "port": "9094",
+ "broker.id": "4",
+ "log.segment.bytes": "10240",
+ "log.dir": "/tmp/kafka_server_4_logs",
+ "default.replication.factor": "3",
+ "num.partitions": "5",
+ "log_filename": "kafka_server_4.log",
+ "config_filename": "kafka_server_4.properties"
+ },
+ {
+ "entity_id": "5",
+ "topic": "test",
+ "threads": "5",
+ "compression-codec": "0",
+ "message-size": "500",
+ "message": "1000",
+ "request-num-acks": "-1",
+ "sync":"true",
+ "producer-num-retries":"5",
+ "log_filename": "producer_performance_10.log",
+ "config_filename": "producer_performance_10.properties"
+ },
+ {
+ "entity_id": "6",
+ "topic": "test_0001",
+ "group.id": "group1",
+ "consumer-timeout-ms": "30000",
+ "log_filename": "console_consumer.log",
+ "config_filename": "console_consumer_6.properties"
+ },
+ {
+ "entity_id": "7",
+ "topic": "test_0002",
+ "group.id": "group2",
+ "consumer-timeout-ms": "30000",
+ "log_filename": "console_consumer.log",
+ "config_filename": "console_consumer_7.properties"
+ },
+ {
+ "entity_id": "8",
+ "topic": "test_0002",
+ "group.id": "group2",
+ "consumer-timeout-ms": "30000",
+ "log_filename": "console_consumer.log",
+ "config_filename": "console_consumer_8.properties"
+ },
+ {
+ "entity_id": "9",
+ "topic": "test_0002",
+ "group.id": "group2",
+ "consumer-timeout-ms": "30000",
+ "log_filename": "console_consumer.log",
+ "config_filename": "console_consumer_9.properties"
+ },
+ {
+ "entity_id": "10",
+ "topic": "test_0003",
+ "group.id": "group2",
+ "consumer-timeout-ms": "30000",
+ "log_filename": "console_consumer.log",
+ "config_filename": "console_consumer_10.properties"
+ }
+ ]
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/system_test/replication_testsuite/replica_basic_test.py
----------------------------------------------------------------------
diff --git a/system_test/replication_testsuite/replica_basic_test.py b/system_test/replication_testsuite/replica_basic_test.py
index 5d3d93e..660006c 100644
--- a/system_test/replication_testsuite/replica_basic_test.py
+++ b/system_test/replication_testsuite/replica_basic_test.py
@@ -188,7 +188,7 @@ class ReplicaBasicTest(ReplicationUtils, SetupUtils):
if autoCreateTopic.lower() == "false":
self.log_message("creating topics")
- kafka_system_test_utils.create_topic(self.systemTestEnv, self.testcaseEnv)
+ kafka_system_test_utils.create_topic_for_producer_performance(self.systemTestEnv, self.testcaseEnv)
self.anonLogger.info("sleeping for 5s")
time.sleep(5)