You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2012/12/17 23:32:23 UTC

git commit: KAFKA-647 Provide a property in System Test for no. of topics and topics string will be generated automatically; reviewed by Neha Narkhede

Updated Branches:
  refs/heads/0.8 f0d26335f -> 9001d7483


KAFKA-647 Provide a property in System Test for no. of topics and topics string will be generated automatically; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9001d748
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9001d748
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9001d748

Branch: refs/heads/0.8
Commit: 9001d74839772d148443d3d86aca48f5dbbc890b
Parents: f0d2633
Author: John Fung <fu...@gmail.com>
Authored: Mon Dec 17 14:12:27 2012 -0800
Committer: Neha Narkhede <ne...@apache.org>
Committed: Mon Dec 17 14:12:27 2012 -0800

----------------------------------------------------------------------
 .../testcase_9051/cluster_config.json              |   48 +++++-----
 .../testcase_9051/testcase_9051_properties.json    |    5 +-
 system_test/utils/kafka_system_test_utils.py       |   77 +++++++++++++-
 system_test/utils/testcase_env.py                  |    5 +
 4 files changed, 103 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9001d748/system_test/replication_testsuite/testcase_9051/cluster_config.json
----------------------------------------------------------------------
diff --git a/system_test/replication_testsuite/testcase_9051/cluster_config.json b/system_test/replication_testsuite/testcase_9051/cluster_config.json
index 7bef92b..8ed896b 100644
--- a/system_test/replication_testsuite/testcase_9051/cluster_config.json
+++ b/system_test/replication_testsuite/testcase_9051/cluster_config.json
@@ -2,57 +2,57 @@
     "cluster_config": [
         {
             "entity_id": "0",
-            "hostname": "esv4-app18.corp",
+            "hostname": "localhost",
             "role": "zookeeper",
             "cluster_name": "source",
-            "kafka_home": "/mnt/u001/kafka_08_replication_system_test",
-            "java_home": "/export/apps/jdk/JDK-1_6_0_21",
-            "jmx_port": "9900"
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9990"
         },
         {
             "entity_id": "1",
-            "hostname": "esv4-app18.corp",
+            "hostname": "localhost",
             "role": "broker",
             "cluster_name": "source",
-            "kafka_home": "/mnt/u001/kafka_08_replication_system_test",
-            "java_home": "/export/apps/jdk/JDK-1_6_0_21",
-            "jmx_port": "9901"
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9991"
         },
         {
             "entity_id": "2",
-            "hostname": "esv4-app19.corp",
+            "hostname": "localhost",
             "role": "broker",
             "cluster_name": "source",
-            "kafka_home": "/mnt/u001/kafka_08_replication_system_test",
-            "java_home": "/export/apps/jdk/JDK-1_6_0_21",
-            "jmx_port": "9902"
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9992"
         },
         {
             "entity_id": "3",
-            "hostname": "esv4-app20.corp",
+            "hostname": "localhost",
             "role": "broker",
             "cluster_name": "source",
-            "kafka_home": "/mnt/u001/kafka_08_replication_system_test",
-            "java_home": "/export/apps/jdk/JDK-1_6_0_21",
-            "jmx_port": "9903"
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9993"
         },
         {
             "entity_id": "4",
-            "hostname": "esv4-app18.corp",
+            "hostname": "localhost",
             "role": "producer_performance",
             "cluster_name": "source",
-            "kafka_home": "/mnt/u001/kafka_08_replication_system_test",
-            "java_home": "/export/apps/jdk/JDK-1_6_0_21",
-            "jmx_port": "9909"
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9997"
         },
         {
             "entity_id": "5",
-            "hostname": "esv4-app18.corp",
+            "hostname": "localhost",
             "role": "console_consumer",
             "cluster_name": "source",
-            "kafka_home": "/mnt/u001/kafka_08_replication_system_test",
-            "java_home": "/export/apps/jdk/JDK-1_6_0_21",
-            "jmx_port": "9910"
+            "kafka_home": "default",
+            "java_home": "default",
+            "jmx_port": "9998"
         }
     ]
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9001d748/system_test/replication_testsuite/testcase_9051/testcase_9051_properties.json
----------------------------------------------------------------------
diff --git a/system_test/replication_testsuite/testcase_9051/testcase_9051_properties.json b/system_test/replication_testsuite/testcase_9051/testcase_9051_properties.json
index 9a99c39..1a03724 100644
--- a/system_test/replication_testsuite/testcase_9051/testcase_9051_properties.json
+++ b/system_test/replication_testsuite/testcase_9051/testcase_9051_properties.json
@@ -20,6 +20,7 @@
     "consumer_multi_topics_mode": "true",
     "sleep_seconds_between_producer_calls": "5",
     "message_producing_free_time_sec": "15",
+    "num_topics_for_auto_generated_string": "20",
     "num_messages_to_produce_per_producer_call": "50"
   },
   "entities": [
@@ -59,7 +60,7 @@
     },
     {
       "entity_id": "4",
-      "topic": "t001,t002,t003,t004,t005,t006,t007,t008,t009,t010,t011,t012,t013,t014,t015,t016,t017,t018,t019,t020",
+      "topic": "t001",
       "threads": "5",
       "compression-codec": "0",
       "message-size": "500",
@@ -73,7 +74,7 @@
     },
     {
       "entity_id": "5",
-      "topic": "t001,t002,t003,t004,t005,t006,t007,t008,t009,t010,t011,t012,t013,t014,t015,t016,t017,t018,t019,t020",
+      "topic": "t001",
       "groupid": "mytestgroup",
       "consumer-timeout-ms": "60000",
       "zookeeper": "localhost:2188",

http://git-wip-us.apache.org/repos/asf/kafka/blob/9001d748/system_test/utils/kafka_system_test_utils.py
----------------------------------------------------------------------
diff --git a/system_test/utils/kafka_system_test_utils.py b/system_test/utils/kafka_system_test_utils.py
index c429833..293ef5a 100644
--- a/system_test/utils/kafka_system_test_utils.py
+++ b/system_test/utils/kafka_system_test_utils.py
@@ -777,9 +777,28 @@ def start_console_consumer(systemTestEnv, testcaseEnv):
 
         # testcase configurations:
         testcaseList = testcaseEnv.testcaseConfigsList
-        topic     = system_test_utils.get_data_by_lookup_keyval(testcaseList, "entity_id", entityId, "topic")
+
+        # get testcase arguments
+        # 1. topics
+        numTopicsForAutoGenString = -1
+        try:
+            numTopicsForAutoGenString = int(testcaseEnv.testcaseArgumentsDict["num_topics_for_auto_generated_string"])
+        except:
+            pass
+
+        topic = ""
+        if numTopicsForAutoGenString < 0:
+            topic = system_test_utils.get_data_by_lookup_keyval(testcaseList, "entity_id", entityId, "topic")
+        else:
+            topic = generate_topics_string("topic", numTopicsForAutoGenString)
+
+        # update this variable and will be used by data validation functions
+        testcaseEnv.consumerTopicsString = topic
+
+        # 2. consumer timeout
         timeoutMs = system_test_utils.get_data_by_lookup_keyval(testcaseList, "entity_id", entityId, "consumer-timeout-ms")
 
+        # 3. consumer formatter
         formatterOption = ""
         try:
             formatterOption = system_test_utils.get_data_by_lookup_keyval(testcaseList, "entity_id", entityId, "formatter")
@@ -789,6 +808,7 @@ def start_console_consumer(systemTestEnv, testcaseEnv):
         if len(formatterOption) > 0:
             formatterOption = " --formatter " + formatterOption + " "
 
+        # get zk.connect
         zkConnectStr = ""
         if clusterName == "source":
             zkConnectStr = testcaseEnv.userDefinedEnvVarDict["sourceZkConnectStr"]
@@ -857,6 +877,34 @@ def start_producer_performance(systemTestEnv, testcaseEnv, kafka07Client):
         time.sleep(1)
         testcaseEnv.lock.release()
 
+def generate_topics_string(topicPrefix, numOfTopics):
+    # return a topics string in the following format:
+    # <topicPrefix>_0001,<topicPrefix>_0002,...
+    # eg. "topic_0001,topic_0002,...,topic_xxxx"
+
+    topicsStr = ""
+    counter   = 1
+    idx       = "1"
+    while counter <= numOfTopics:
+        if counter <= 9:
+            idx = "000" + str(counter)
+        elif counter <= 99:
+            idx = "00"  + str(counter)
+        elif counter <= 999:
+            idx = "0"  +  str(counter)
+        elif counter <= 9999:
+            idx = str(counter)
+        else:
+            raise Exception("Error: no. of topics must be under 10000 - current topics count : " + counter)
+
+        if len(topicsStr) == 0:
+            topicsStr = topicPrefix + "_" + idx
+        else:
+            topicsStr = topicsStr + "," + topicPrefix + "_" + idx
+
+        counter += 1
+    return topicsStr
+
 def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafka07Client):
     host              = producerConfig["hostname"]
     entityId          = producerConfig["entity_id"]
@@ -868,9 +916,24 @@ def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafk
     jmxPort           = system_test_utils.get_data_by_lookup_keyval(entityConfigList, "entity_id", entityId, "jmx_port")
     kafkaRunClassBin  = kafkaHome + "/bin/kafka-run-class.sh"
 
+    # get optional testcase arguments
+    numTopicsForAutoGenString = -1
+    try:
+        numTopicsForAutoGenString = int(testcaseEnv.testcaseArgumentsDict["num_topics_for_auto_generated_string"])
+    except:
+        pass
+
     # testcase configurations:
     testcaseConfigsList = testcaseEnv.testcaseConfigsList
-    topic          = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "topic")
+    topic = ""
+    if numTopicsForAutoGenString < 0:
+        topic      = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "topic")
+    else:
+        topic      = generate_topics_string("topic", numTopicsForAutoGenString)
+
+    # update this variable and will be used by data validation functions
+    testcaseEnv.producerTopicsString = topic
+
     threads        = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "threads")
     compCodec      = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "compression-codec")
     messageSize    = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "message-size")
@@ -1125,7 +1188,8 @@ def validate_data_matched(systemTestEnv, testcaseEnv):
 
     for prodPerfCfg in prodPerfCfgList:
         producerEntityId = prodPerfCfg["entity_id"]
-        topic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", producerEntityId, "topic")
+        #topic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", producerEntityId, "topic")
+        topic = testcaseEnv.producerTopicsString
         acks  = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", producerEntityId, "request-num-acks")
 
         consumerEntityIdList = system_test_utils.get_data_from_list_of_dicts( \
@@ -1133,7 +1197,8 @@ def validate_data_matched(systemTestEnv, testcaseEnv):
 
         matchingConsumerEntityId = None
         for consumerEntityId in consumerEntityIdList:
-            consumerTopic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", consumerEntityId, "topic")
+            #consumerTopic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", consumerEntityId, "topic")
+            consumerTopic = testcaseEnv.consumerTopicsString
             if consumerTopic in topic:
                 matchingConsumerEntityId = consumerEntityId
                 break
@@ -2042,14 +2107,14 @@ def validate_data_matched_in_multi_topics_from_single_consumer_producer(systemTe
 
     for prodPerfCfg in prodPerfCfgList:
         producerEntityId = prodPerfCfg["entity_id"]
-        topicStr = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", producerEntityId, "topic")
+        topicStr = testcaseEnv.producerTopicsString
         acks     = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", producerEntityId, "request-num-acks")
 
         consumerEntityIdList = system_test_utils.get_data_from_list_of_dicts(clusterEntityConfigDictList, "role", "console_consumer", "entity_id")
 
         matchingConsumerEntityId = None
         for consumerEntityId in consumerEntityIdList:
-            consumerTopic = system_test_utils.get_data_by_lookup_keyval(testcaseEnv.testcaseConfigsList, "entity_id", consumerEntityId, "topic")
+            consumerTopic = testcaseEnv.consumerTopicsString
             if consumerTopic in topicStr:
                 matchingConsumerEntityId = consumerEntityId
                 break

http://git-wip-us.apache.org/repos/asf/kafka/blob/9001d748/system_test/utils/testcase_env.py
----------------------------------------------------------------------
diff --git a/system_test/utils/testcase_env.py b/system_test/utils/testcase_env.py
index 4207b40..bee8716 100644
--- a/system_test/utils/testcase_env.py
+++ b/system_test/utils/testcase_env.py
@@ -127,6 +127,11 @@ class TestcaseEnv():
 
         self.numProducerThreadsRunning = 0
 
+        # to be used when validating data match - these variables will be
+        # updated by kafka_system_test_utils.start_producer_in_thread
+        self.producerTopicsString = ""
+        self.consumerTopicsString = ""
+
     def initWithKnownTestCasePathName(self, testCasePathName):
         testcaseDirName = os.path.basename(testCasePathName)
         self.testcaseResultsDict["_test_case_name"] = testcaseDirName