You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2012/12/17 23:32:23 UTC
git commit: KAFKA-647 Provide a property in System Test for no. of
topics and topics string will be generated automatically;
reviewed by Neha Narkhede
Updated Branches:
refs/heads/0.8 f0d26335f -> 9001d7483
KAFKA-647 Provide a property in System Test for no. of topics and topics string will be generated automatically; reviewed by Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9001d748
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9001d748
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9001d748
Branch: refs/heads/0.8
Commit: 9001d74839772d148443d3d86aca48f5dbbc890b
Parents: f0d2633
Author: John Fung <fu...@gmail.com>
Authored: Mon Dec 17 14:12:27 2012 -0800
Committer: Neha Narkhede <ne...@apache.org>
Committed: Mon Dec 17 14:12:27 2012 -0800
----------------------------------------------------------------------
.../testcase_9051/cluster_config.json | 48 +++++-----
.../testcase_9051/testcase_9051_properties.json | 5 +-
system_test/utils/kafka_system_test_utils.py | 77 +++++++++++++-
system_test/utils/testcase_env.py | 5 +
4 files changed, 103 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/9001d748/system_test/replication_testsuite/testcase_9051/cluster_config.json
----------------------------------------------------------------------
diff --git a/system_test/replication_testsuite/testcase_9051/cluster_config.json b/system_test/replication_testsuite/testcase_9051/cluster_config.json
index 7bef92b..8ed896b 100644
--- a/system_test/replication_testsuite/testcase_9051/cluster_config.json
+++ b/system_test/replication_testsuite/testcase_9051/cluster_config.json
@@ -2,57 +2,57 @@
"cluster_config": [
{
"entity_id": "0",
- "hostname": "esv4-app18.corp",
+ "hostname": "localhost",
"role": "zookeeper",
"cluster_name": "source",
- "kafka_home": "/mnt/u001/kafka_08_replication_system_test",
- "java_home": "/export/apps/jdk/JDK-1_6_0_21",
- "jmx_port": "9900"
+ "kafka_home": "default",
+ "java_home": "default",
+ "jmx_port": "9990"
},
{
"entity_id": "1",
- "hostname": "esv4-app18.corp",
+ "hostname": "localhost",
"role": "broker",
"cluster_name": "source",
- "kafka_home": "/mnt/u001/kafka_08_replication_system_test",
- "java_home": "/export/apps/jdk/JDK-1_6_0_21",
- "jmx_port": "9901"
+ "kafka_home": "default",
+ "java_home": "default",
+ "jmx_port": "9991"
},
{
"entity_id": "2",
- "hostname": "esv4-app19.corp",
+ "hostname": "localhost",
"role": "broker",
"cluster_name": "source",
- "kafka_home": "/mnt/u001/kafka_08_replication_system_test",
- "java_home": "/export/apps/jdk/JDK-1_6_0_21",
- "jmx_port": "9902"
+ "kafka_home": "default",
+ "java_home": "default",
+ "jmx_port": "9992"
},
{
"entity_id": "3",
- "hostname": "esv4-app20.corp",
+ "hostname": "localhost",
"role": "broker",
"cluster_name": "source",
- "kafka_home": "/mnt/u001/kafka_08_replication_system_test",
- "java_home": "/export/apps/jdk/JDK-1_6_0_21",
- "jmx_port": "9903"
+ "kafka_home": "default",
+ "java_home": "default",
+ "jmx_port": "9993"
},
{
"entity_id": "4",
- "hostname": "esv4-app18.corp",
+ "hostname": "localhost",
"role": "producer_performance",
"cluster_name": "source",
- "kafka_home": "/mnt/u001/kafka_08_replication_system_test",
- "java_home": "/export/apps/jdk/JDK-1_6_0_21",
- "jmx_port": "9909"
+ "kafka_home": "default",
+ "java_home": "default",
+ "jmx_port": "9997"
},
{
"entity_id": "5",
- "hostname": "esv4-app18.corp",
+ "hostname": "localhost",
"role": "console_consumer",
"cluster_name": "source",
- "kafka_home": "/mnt/u001/kafka_08_replication_system_test",
- "java_home": "/export/apps/jdk/JDK-1_6_0_21",
- "jmx_port": "9910"
+ "kafka_home": "default",
+ "java_home": "default",
+ "jmx_port": "9998"
}
]
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9001d748/system_test/replication_testsuite/testcase_9051/testcase_9051_properties.json
----------------------------------------------------------------------
diff --git a/system_test/replication_testsuite/testcase_9051/testcase_9051_properties.json b/system_test/replication_testsuite/testcase_9051/testcase_9051_properties.json
index 9a99c39..1a03724 100644
--- a/system_test/replication_testsuite/testcase_9051/testcase_9051_properties.json
+++ b/system_test/replication_testsuite/testcase_9051/testcase_9051_properties.json
@@ -20,6 +20,7 @@
"consumer_multi_topics_mode": "true",
"sleep_seconds_between_producer_calls": "5",
"message_producing_free_time_sec": "15",
+ "num_topics_for_auto_generated_string": "20",
"num_messages_to_produce_per_producer_call": "50"
},
"entities": [
@@ -59,7 +60,7 @@
},
{
"entity_id": "4",
- "topic": "t001,t002,t003,t004,t005,t006,t007,t008,t009,t010,t011,t012,t013,t014,t015,t016,t017,t018,t019,t020",
+ "topic": "t001",
"threads": "5",
"compression-codec": "0",
"message-size": "500",
@@ -73,7 +74,7 @@
},
{
"entity_id": "5",
- "topic": "t001,t002,t003,t004,t005,t006,t007,t008,t009,t010,t011,t012,t013,t014,t015,t016,t017,t018,t019,t020",
+ "topic": "t001",
"groupid": "mytestgroup",
"consumer-timeout-ms": "60000",
"zookeeper": "localhost:2188",
http://git-wip-us.apache.org/repos/asf/kafka/blob/9001d748/system_test/utils/kafka_system_test_utils.py
----------------------------------------------------------------------
diff --git a/system_test/utils/kafka_system_test_utils.py b/system_test/utils/kafka_system_test_utils.py
index c429833..293ef5a 100644
--- a/system_test/utils/kafka_system_test_utils.py
+++ b/system_test/utils/kafka_system_test_utils.py
@@ -777,9 +777,28 @@ def start_console_consumer(systemTestEnv, testcaseEnv):
# testcase configurations:
testcaseList = testcaseEnv.testcaseConfigsList
- topic = system_test_utils.get_data_by_lookup_keyval(testcaseList, "entity_id", entityId, "topic")
+
+ # get testcase arguments
+ # 1. topics
+ numTopicsForAutoGenString = -1
+ try:
+ numTopicsForAutoGenString = int(testcaseEnv.testcaseArgumentsDict["num_topics_for_auto_generated_string"])
+ except:
+ pass
+
+ topic = ""
+ if numTopicsForAutoGenString < 0:
+ topic = system_test_utils.get_data_by_lookup_keyval(testcaseList, "entity_id", entityId, "topic")
+ else:
+ topic = generate_topics_string("topic", numTopicsForAutoGenString)
+
+ # update this variable and will be used by data validation functions
+ testcaseEnv.consumerTopicsString = topic
+
+ # 2. consumer timeout
timeoutMs = system_test_utils.get_data_by_lookup_keyval(testcaseList, "entity_id", entityId, "consumer-timeout-ms")
+ # 3. consumer formatter
formatterOption = ""
try:
formatterOption = system_test_utils.get_data_by_lookup_keyval(testcaseList, "entity_id", entityId, "formatter")
@@ -789,6 +808,7 @@ def start_console_consumer(systemTestEnv, testcaseEnv):
if len(formatterOption) > 0:
formatterOption = " --formatter " + formatterOption + " "
+ # get zk.connect
zkConnectStr = ""
if clusterName == "source":
zkConnectStr = testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"]
@@ -857,6 +877,34 @@ def start_producer_performance(systemTestEnv, testcaseEnv, kafka07Client):
time.sleep(1)
testcaseEnv.lock.release()
+def generate_topics_string(topicPrefix, numOfTopics):
+ # return a topics string in the following format:
+ # <topicPrefix>_0001,<topicPrefix>_0002,...
+ # eg. "topic_0001,topic_0002,...,topic_xxxx"
+
+ topicsStr = ""
+ counter = 1
+ idx = "1"
+ while counter <= numOfTopics:
+ if counter <= 9:
+ idx = "000" + str(counter)
+ elif counter <= 99:
+ idx = "00" + str(counter)
+ elif counter <= 999:
+ idx = "0" + str(counter)
+ elif counter <= 9999:
+ idx = str(counter)
+ else:
+ raise Exception("Error: no. of topics must be under 10000 - current topics count : " + counter)
+
+ if len(topicsStr) == 0:
+ topicsStr = topicPrefix + "_" + idx
+ else:
+ topicsStr = topicsStr + "," + topicPrefix + "_" + idx
+
+ counter += 1
+ return topicsStr
+
def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafka07Client):
host = producerConfig["hostname"]
entityId = producerConfig["entity_id"]
@@ -868,9 +916,24 @@ def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafk
jmxPort = system_test_utils.get_data_by_lookup_keyval(entityConfigList, "entity_id", entityId, "jmx_port")
kafkaRunClassBin = kafkaHome + "/bin/kafka-run-class.sh"
+ # get optional testcase arguments
+ numTopicsForAutoGenString = -1
+ try:
+ numTopicsForAutoGenString = int(testcaseEnv.testcaseArgumentsDict["num_topics_for_auto_generated_string"])
+ except:
+ pass
+
# testcase configurations:
testcaseConfigsList = testcaseEnv.testcaseConfigsList
- topic = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "topic")
+ topic = ""
+ if numTopicsForAutoGenString < 0:
+ topic = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "topic")
+ else:
+ topic = generate_topics_string("topic", numTopicsForAutoGenString)
+
+ # update this variable and will be used by data validation functions
+ testcaseEnv.producerTopicsString = topic
+
threads = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "threads")
compCodec = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "compression-codec")
messageSize = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "message-size")
@@ -1125,7 +1188,8 @@ def validate_data_matched(systemTestEnv, testcaseEnv):
for prodPerfCfg in prodPerfCfgList:
producerEntityId = prodPerfCfg["entity_id"]
- topic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", producerEntityId, "topic")
+ #topic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", producerEntityId, "topic")
+ topic = testcaseEnv.producerTopicsString
acks = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", producerEntityId, "request-num-acks")
consumerEntityIdList = system_test_utils.get_data_from_list_of_dicts( \
@@ -1133,7 +1197,8 @@ def validate_data_matched(systemTestEnv, testcaseEnv):
matchingConsumerEntityId = None
for consumerEntityId in consumerEntityIdList:
- consumerTopic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", consumerEntityId, "topic")
+ #consumerTopic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", consumerEntityId, "topic")
+ consumerTopic = testcaseEnv.consumerTopicsString
if consumerTopic in topic:
matchingConsumerEntityId = consumerEntityId
break
@@ -2042,14 +2107,14 @@ def validate_data_matched_in_multi_topics_from_single_consumer_producer(systemTe
for prodPerfCfg in prodPerfCfgList:
producerEntityId = prodPerfCfg["entity_id"]
- topicStr = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", producerEntityId, "topic")
+ topicStr = testcaseEnv.producerTopicsString
acks = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", producerEntityId, "request-num-acks")
consumerEntityIdList = system_test_utils.get_data_from_list_of_dicts(clusterEntityConfigDictList, "role", "console_consumer", "entity_id")
matchingConsumerEntityId = None
for consumerEntityId in consumerEntityIdList:
- consumerTopic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", consumerEntityId, "topic")
+ consumerTopic = testcaseEnv.consumerTopicsString
if consumerTopic in topicStr:
matchingConsumerEntityId = consumerEntityId
break
http://git-wip-us.apache.org/repos/asf/kafka/blob/9001d748/system_test/utils/testcase_env.py
----------------------------------------------------------------------
diff --git a/system_test/utils/testcase_env.py b/system_test/utils/testcase_env.py
index 4207b40..bee8716 100644
--- a/system_test/utils/testcase_env.py
+++ b/system_test/utils/testcase_env.py
@@ -127,6 +127,11 @@ class TestcaseEnv():
self.numProducerThreadsRunning = 0
+ # to be used when validating data match - these variables will be
+ # updated by kafka_system_test_utils.start_producer_in_thread
+ self.producerTopicsString = ""
+ self.consumerTopicsString = ""
+
def initWithKnownTestCasePathName(self, testCasePathName):
testcaseDirName = os.path.basename(testCasePathName)
self.testcaseResultsDict["_test_case_name"] = testcaseDirName