You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2012/08/31 01:47:56 UTC

svn commit: r1379232 - in /incubator/kafka/branches/0.8/system_test: ./ replication_testsuite/ replication_testsuite/config/ replication_testsuite/testcase_1/ utils/

Author: nehanarkhede
Date: Thu Aug 30 23:47:55 2012
New Revision: 1379232

URL: http://svn.apache.org/viewvc?rev=1379232&view=rev
Log:
KAFKA-483 Improvements to the system testing framework; patched by John Fung; reviewed by Neha Narkhede

Modified:
    incubator/kafka/branches/0.8/system_test/README.txt
    incubator/kafka/branches/0.8/system_test/cluster_config.json
    incubator/kafka/branches/0.8/system_test/replication_testsuite/config/producer_performance.properties
    incubator/kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py
    incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_1/testcase_1_properties.json
    incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py
    incubator/kafka/branches/0.8/system_test/utils/metrics.py
    incubator/kafka/branches/0.8/system_test/utils/system_test_utils.py
    incubator/kafka/branches/0.8/system_test/utils/testcase_env.py

Modified: incubator/kafka/branches/0.8/system_test/README.txt
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/README.txt?rev=1379232&r1=1379231&r2=1379232&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/README.txt (original)
+++ incubator/kafka/branches/0.8/system_test/README.txt Thu Aug 30 23:47:55 2012
@@ -1,9 +1,7 @@
 # ==========================
 # Known Issues:
 # ==========================
-1. The "broker-list" in system_test/replication_testsuite/testcase_1/testcase_1_properties.json needs to be manually updated to the proper settings of your environment. (If this test is to be run in a single localhost, no change is required for this.)
-2. Sometimes the running processes may not be terminated properly by the script.
-
+1. This test framework currently doesn't support MacOS due to different "ps" argument options from Linux. The correct ps execution is required to terminate the background running processes properly.
 
 # ==========================
 # Overview
@@ -57,7 +55,8 @@ The framework has the following levels:
   2. Edit system_test/replication_testsuite/testcase_1/testcase_1_properties.json and update "broker-list" to the proper settings of your environment. (If this test is to be run in a single localhost, no change is required for this.)
   3. To run the test, go to <kafka_home>/system_test and run the following command:
      $ python -B system_test_runner.py 
-
+  4. To turn on debugging, update system_test/system_test_runner.py and uncomment the following line:
+         namedLogger.setLevel(logging.DEBUG)
 
 # ==========================
 # Adding Test Case

Modified: incubator/kafka/branches/0.8/system_test/cluster_config.json
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/cluster_config.json?rev=1379232&r1=1379231&r2=1379232&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/cluster_config.json (original)
+++ incubator/kafka/branches/0.8/system_test/cluster_config.json Thu Aug 30 23:47:55 2012
@@ -4,48 +4,48 @@
             "entity_id": "0",
             "hostname": "localhost",
             "role": "zookeeper",
-            "kafka_home": "/home/nnarkhed/Projects/kafka-440-with-metrics",
-            "java_home": "/export/apps/jdk/JDK-1_6_0_27",
+            "kafka_home": "default",
+            "java_home": "default",
             "jmx_port": "9990"
         },
         {
             "entity_id": "1",
             "hostname": "localhost",
             "role": "broker",
-            "kafka_home": "/home/nnarkhed/Projects/kafka-440-with-metrics",
-            "java_home": "/export/apps/jdk/JDK-1_6_0_27",
+            "kafka_home": "default",
+            "java_home": "default",
             "jmx_port": "9991"
         },
         {
             "entity_id": "2",
             "hostname": "localhost",
             "role": "broker",
-            "kafka_home": "/home/nnarkhed/Projects/kafka-440-with-metrics",
-            "java_home": "/export/apps/jdk/JDK-1_6_0_27",
+            "kafka_home": "default",
+            "java_home": "default",
             "jmx_port": "9992"
         },
         {
             "entity_id": "3",
             "hostname": "localhost",
             "role": "broker",
-            "kafka_home": "/home/nnarkhed/Projects/kafka-440-with-metrics",
-            "java_home": "/export/apps/jdk/JDK-1_6_0_27",
+            "kafka_home": "default",
+            "java_home": "default",
             "jmx_port": "9993"
         },
         {
             "entity_id": "4",
             "hostname": "localhost",
             "role": "producer_performance",
-            "kafka_home": "/home/nnarkhed/Projects/kafka-440-with-metrics",
-            "java_home": "/export/apps/jdk/JDK-1_6_0_27",
+            "kafka_home": "default",
+            "java_home": "default",
             "jmx_port": "9994"
         },
         {
             "entity_id": "5",
             "hostname": "localhost",
             "role": "console_consumer",
-            "kafka_home": "/home/nnarkhed/Projects/kafka-440-with-metrics",
-            "java_home": "/export/apps/jdk/JDK-1_6_0_27",
+            "kafka_home": "default",
+            "java_home": "default",
             "jmx_port": "9995"
         }
     ]

Modified: incubator/kafka/branches/0.8/system_test/replication_testsuite/config/producer_performance.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/replication_testsuite/config/producer_performance.properties?rev=1379232&r1=1379231&r2=1379232&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/replication_testsuite/config/producer_performance.properties (original)
+++ incubator/kafka/branches/0.8/system_test/replication_testsuite/config/producer_performance.properties Thu Aug 30 23:47:55 2012
@@ -1,4 +1,3 @@
-broker-list=localhost:2181
 topic=mytest
 messages=200
 message-size=100

Modified: incubator/kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py?rev=1379232&r1=1379231&r2=1379232&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py (original)
+++ incubator/kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py Thu Aug 30 23:47:55 2012
@@ -44,6 +44,7 @@ class ReplicaBasicTest(SetupUtils):
     testModuleAbsPathName = os.path.realpath(__file__)
     testSuiteAbsPathName  = os.path.abspath(os.path.dirname(testModuleAbsPathName))
     isLeaderLogPattern    = "Completed the leader state transition"
+    brokerShutDownCompletedPattern = "shut down completed"
 
     def __init__(self, systemTestEnv):
 
@@ -82,12 +83,16 @@ class ReplicaBasicTest(SetupUtils):
                 self.testcaseEnv.testSuiteBaseDir = self.testSuiteAbsPathName
     
                 # initialize self.testcaseEnv with user-defined environment
-                self.testcaseEnv.userDefinedEnvVarDict["LEADER_ELECTION_COMPLETED_MSG"] = \
-                    ReplicaBasicTest.isLeaderLogPattern
+                self.testcaseEnv.userDefinedEnvVarDict["BROKER_SHUT_DOWN_COMPLETED_MSG"] = ReplicaBasicTest.brokerShutDownCompletedPattern
+                self.testcaseEnv.userDefinedEnvVarDict["REGX_BROKER_SHUT_DOWN_COMPLETED_PATTERN"] = \
+                    "\[(.*?)\] .* \[Kafka Server (.*?)\], " + ReplicaBasicTest.brokerShutDownCompletedPattern
+
+                self.testcaseEnv.userDefinedEnvVarDict["LEADER_ELECTION_COMPLETED_MSG"] = ReplicaBasicTest.isLeaderLogPattern
                 self.testcaseEnv.userDefinedEnvVarDict["REGX_LEADER_ELECTION_PATTERN"]  = \
                     "\[(.*?)\] .* Broker (.*?): " + \
                     self.testcaseEnv.userDefinedEnvVarDict["LEADER_ELECTION_COMPLETED_MSG"] + \
                     " for topic (.*?) partition (.*?) \(.*"
+
                 self.testcaseEnv.userDefinedEnvVarDict["zkConnectStr"] = ""
     
                 # find testcase properties json file
@@ -100,7 +105,7 @@ class ReplicaBasicTest(SetupUtils):
                 testcaseDirName = os.path.basename(testCasePathName)
                 self.testcaseEnv.testcaseResultsDict["test_case_name"] = testcaseDirName
     
-                #### => update testcaseEnv
+                # update testcaseEnv
                 self.testcaseEnv.testCaseBaseDir = testCasePathName
                 self.testcaseEnv.testCaseLogsDir = self.testcaseEnv.testCaseBaseDir + "/logs"
                 self.testcaseEnv.testCaseDashboardsDir = self.testcaseEnv.testCaseBaseDir + "/dashboards"
@@ -110,7 +115,6 @@ class ReplicaBasicTest(SetupUtils):
                 for k,v in testcaseNonEntityDataDict.items():
                     if ( k == "description" ): testcaseDescription = v
     
-                #### => update testcaseEnv
                 # TestcaseEnv.testcaseArgumentsDict initialized, this dictionary keeps track of the
                 # "testcase_args" in the testcase_properties.json such as replica_factor, num_partition, ...
                 self.testcaseEnv.testcaseArgumentsDict = testcaseNonEntityDataDict["testcase_args"]
@@ -124,13 +128,13 @@ class ReplicaBasicTest(SetupUtils):
                 # self.testcaseEnv.testCaseLogsDir
                 # self.testcaseEnv.testcaseArgumentsDict
     
+                print
                 # display testcase name and arguments
                 self.log_message("Test Case : " + testcaseDirName)
                 for k,v in self.testcaseEnv.testcaseArgumentsDict.items():
                     self.anonLogger.info("    " + k + " : " + v)
                 self.log_message("Description : " + testcaseDescription)
-    
-    
+   
                 # ================================================================ #
                 # ================================================================ #
                 #            Product Specific Testing Code Starts Here:            #
@@ -153,7 +157,7 @@ class ReplicaBasicTest(SetupUtils):
     
                 # clean up data directories specified in zookeeper.properties and kafka_server_<n>.properties
                 kafka_system_test_utils.cleanup_data_at_remote_hosts(self.systemTestEnv, self.testcaseEnv)
-    
+
                 # generate remote hosts log/config dirs if not exist
                 kafka_system_test_utils.generate_testcase_log_dirs_in_remote_hosts(self.systemTestEnv, self.testcaseEnv)
     
@@ -196,63 +200,46 @@ class ReplicaBasicTest(SetupUtils):
                 # 'topic': 'test_1',
                 # 'brokerid': '3'}
     
+                # =============================================
                 # validate to see if leader election is successful
+                # =============================================
                 self.log_message("validating leader election")
                 result = kafka_system_test_utils.validate_leader_election_successful( \
                              self.testcaseEnv, leaderDict, self.testcaseEnv.validationStatusDict)
     
-                # checking to see if leader bouncing is required in this testcase
+                # =============================================
+                # get leader re-election latency
+                # =============================================
                 bounceLeaderFlag = self.testcaseEnv.testcaseArgumentsDict["bounce_leader"]
                 self.log_message("bounce_leader flag : " + bounceLeaderFlag)
-    
                 if (bounceLeaderFlag.lower() == "true"):
-                    if self.testcaseEnv.validationStatusDict["Validate leader election successful"] == "FAILED":
-                        # no leader available for testing => skip this round
-                        self.log_message("stopping all entities")
-                        for entityId, parentPid in self.testcaseEnv.entityParentPidDict.items():
-                            kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, entityId, parentPid)
-    
-                        continue
-                    else:
-                        # leader elected => stop leader
-                        try:
-                            leaderEntityId = leaderDict["entity_id"]
-                            leaderBrokerId = leaderDict["brokerid"]
-                            leaderPPid     = self.testcaseEnv.entityParentPidDict[leaderEntityId]
-                        except:
-                            self.log_message("leader details unavailable")
-    
-                        self.log_message("stopping leader in entity "+leaderEntityId+" with pid "+leaderPPid)
-                 
-                        kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, leaderEntityId, leaderPPid)
-                        self.testcaseEnv.entityParentPidDict[leaderEntityId] = ""
-    
-                    self.logger.info("sleeping for 5s for leader re-election to complete", extra=self.d)
-                    time.sleep(5)
-    
+                    reelectionLatency = kafka_system_test_utils.get_reelection_latency(self.systemTestEnv, self.testcaseEnv, leaderDict)
+
+                # =============================================
                 # starting producer 
-                self.log_message("starting producer")
+                # =============================================
+                self.log_message("starting producer in the background")
                 kafka_system_test_utils.start_producer_performance(self.systemTestEnv, self.testcaseEnv)
                 self.anonLogger.info("sleeping for 10s")
                 time.sleep(10)
-                kafka_system_test_utils.stop_producer()
     
+                # =============================================
                 # starting previously terminated broker 
-                if (bounceLeaderFlag.lower() == "true" and not self.testcaseEnv.entityParentPidDict[leaderEntityId]):
+                # =============================================
+                if bounceLeaderFlag.lower() == "true":
                     self.log_message("starting the previously terminated broker")
-    
-                    stoppedLeaderEntityId = leaderDict["entity_id"]
-                    kafka_system_test_utils.start_entity_in_background(
-                        self.systemTestEnv, self.testcaseEnv, stoppedLeaderEntityId)
-    
+                    stoppedLeaderEntityId  = leaderDict["entity_id"]
+                    kafka_system_test_utils.start_entity_in_background(self.systemTestEnv, self.testcaseEnv, stoppedLeaderEntityId)
                     self.anonLogger.info("sleeping for 5s")
                     time.sleep(5)
 
+                # =============================================
                 # starting consumer
-                self.log_message("starting consumer")
+                # =============================================
+                self.log_message("starting consumer in the background")
                 kafka_system_test_utils.start_console_consumer(self.systemTestEnv, self.testcaseEnv)
+                self.anonLogger.info("sleeping for 10s")
                 time.sleep(10)
-                kafka_system_test_utils.stop_consumer()
                 
                 # this testcase is completed - so stopping all entities
                 self.log_message("stopping all entities")
@@ -260,6 +247,7 @@ class ReplicaBasicTest(SetupUtils):
                     kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, entityId, parentPid)
                     
                 # validate the data matched
+                # =============================================
                 self.log_message("validating data matched")
                 result = kafka_system_test_utils.validate_data_matched(self.systemTestEnv, self.testcaseEnv)
                 
@@ -287,9 +275,27 @@ class ReplicaBasicTest(SetupUtils):
             except Exception as e:
                 self.log_message("Exception while running test {0}".format(e))
                 traceback.print_exc()
+                traceback.print_exc()
+
+            finally:
                 self.log_message("stopping all entities")
+
                 for entityId, parentPid in self.testcaseEnv.entityParentPidDict.items():
-                    kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, entityId, parentPid)
+                    kafka_system_test_utils.force_stop_remote_entity(self.systemTestEnv, entityId, parentPid)
+
+                for entityId, jmxParentPidList in self.testcaseEnv.entityJmxParentPidDict.items():
+                    for jmxParentPid in jmxParentPidList:
+                        kafka_system_test_utils.force_stop_remote_entity(self.systemTestEnv, entityId, jmxParentPid)
+
+                for hostname, consumerPPid in self.testcaseEnv.consumerHostParentPidDict.items():
+                    consumerEntityId = system_test_utils.get_data_by_lookup_keyval( \
+                        self.systemTestEnv.clusterEntityConfigDictList, "hostname", hostname, "entity_id")
+                    kafka_system_test_utils.force_stop_remote_entity(self.systemTestEnv, consumerEntityId, consumerPPid)
+
+                for hostname, producerPPid in self.testcaseEnv.producerHostParentPidDict.items():
+                    producerEntityId = system_test_utils.get_data_by_lookup_keyval( \
+                        self.systemTestEnv.clusterEntityConfigDictList, "hostname", hostname, "entity_id")
+                    kafka_system_test_utils.force_stop_remote_entity(self.systemTestEnv, producerEntityId, producerPPid)
 
+                #kafka_system_test_utils.ps_grep_terminate_running_entity(self.systemTestEnv)
 
- 

Modified: incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_1/testcase_1_properties.json
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_1/testcase_1_properties.json?rev=1379232&r1=1379231&r2=1379232&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_1/testcase_1_properties.json (original)
+++ incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_1/testcase_1_properties.json Thu Aug 30 23:47:55 2012
@@ -3,7 +3,7 @@
   "testcase_args": {
     "bounce_leader": "true",
     "replica_factor": "3",
-    "num_partition": "2"
+    "num_partition": "1"
   },
   "entities": [
     {
@@ -47,7 +47,6 @@
       "compression-codec": "1",
       "message-size": "500",
       "message": "500",
-      "broker-list": "localhost:9091,localhost:9092,localhost:9093",
       "log_filename": "producer_performance.log",
       "config_filename": "producer_performance.properties"
     },

Modified: incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py?rev=1379232&r1=1379231&r2=1379232&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py (original)
+++ incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py Thu Aug 30 23:47:55 2012
@@ -21,6 +21,7 @@
 # ===================================
 
 import datetime
+import getpass
 import inspect
 import json
 import logging
@@ -132,16 +133,40 @@ def collect_logs_from_remote_hosts(syste
 
         configPathName     = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id, "config")
         metricsPathName    = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id, "metrics")
-        dashboardsPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id, "dashboards")
+        logPathName        = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id, "default")
 
+        # ==============================
+        # collect entity log file
+        # ==============================
+        cmdList = ["scp",
+                   hostname + ":" + logPathName + "/*",
+                   logPathName]
+        cmdStr  = " ".join(cmdList)
+        logger.debug("executing command [" + cmdStr + "]", extra=d)
+        system_test_utils.sys_call(cmdStr)
+
+        # ==============================
+        # collect entity metrics file
+        # ==============================
         cmdList = ["scp",
                    hostname + ":" + metricsPathName + "/*",
                    metricsPathName]
         cmdStr  = " ".join(cmdList)
         logger.debug("executing command [" + cmdStr + "]", extra=d)
         system_test_utils.sys_call(cmdStr)
- 
 
+    # ==============================
+    # collect dashboards file
+    # ==============================
+    dashboardsPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id, "dashboards")
+    cmdList = ["scp",
+               hostname + ":" + dashboardsPathName + "/*",
+               dashboardsPathName]
+    cmdStr  = " ".join(cmdList)
+    logger.debug("executing command [" + cmdStr + "]", extra=d)
+    system_test_utils.sys_call(cmdStr)
+
+ 
 def generate_testcase_log_dirs_in_remote_hosts(systemTestEnv, testcaseEnv):
     testCaseBaseDir = testcaseEnv.testCaseBaseDir
 
@@ -212,6 +237,7 @@ def copy_file_with_dict_values(srcFile, 
         outfile.write(line)
     outfile.close()
 
+
 def generate_overriden_props_files(testsuitePathname, testcaseEnv, systemTestEnv):
     logger.info("calling generate_properties_files", extra=d)
 
@@ -259,7 +285,7 @@ def generate_overriden_props_files(tests
                                                cfgDestPathname + "/" + tcCfg["config_filename"], tcCfg)
 
                 elif ( clusterCfg["role"] == "producer_performance"):
-                    tcCfg["brokerinfo"] = "zk.connect" + "=" + zkConnectStr
+                    #tcCfg["brokerinfo"] = "zk.connect" + "=" + zkConnectStr
                     copy_file_with_dict_values(cfgTemplatePathname + "/producer_performance.properties", \
                                                cfgDestPathname + "/" + tcCfg["config_filename"], tcCfg)
 
@@ -308,6 +334,62 @@ def start_brokers(systemTestEnv, testcas
         start_entity_in_background(systemTestEnv, testcaseEnv, brokerEntityId)
 
 
+def get_broker_shutdown_log_line(systemTestEnv, testcaseEnv):
+
+    logger.info("looking up broker shutdown...", extra=d)
+
+    # keep track of broker related data in this dict such as broker id,
+    # entity id and timestamp and return it to the caller function
+    shutdownBrokerDict = {} 
+
+    clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
+    brokerEntityIdList = system_test_utils.get_data_from_list_of_dicts( \
+                             clusterEntityConfigDictList, "role", "broker", "entity_id")
+
+    for brokerEntityId in brokerEntityIdList:
+
+        hostname   = system_test_utils.get_data_by_lookup_keyval( \
+                         clusterEntityConfigDictList, "entity_id", brokerEntityId, "hostname")
+        logFile    = system_test_utils.get_data_by_lookup_keyval( \
+                         testcaseEnv.testcaseConfigsList, "entity_id", brokerEntityId, "log_filename")
+
+        shutdownBrokerDict["entity_id"] = brokerEntityId
+        shutdownBrokerDict["hostname"]  = hostname
+
+        logPathName = get_testcase_config_log_dir_pathname(testcaseEnv, "broker", brokerEntityId, "default")
+        cmdStrList = ["ssh " + hostname,
+                      "\"grep -i -h '" + testcaseEnv.userDefinedEnvVarDict["BROKER_SHUT_DOWN_COMPLETED_MSG"] + "' ",
+                      logPathName + "/" + logFile + " | ",
+                      "sort | tail -1\""]
+        cmdStr     = " ".join(cmdStrList)
+
+        logger.debug("executing command [" + cmdStr + "]", extra=d)
+        subproc = system_test_utils.sys_call_return_subproc(cmdStr)
+        for line in subproc.stdout.readlines():
+
+            line = line.rstrip('\n')
+
+            if testcaseEnv.userDefinedEnvVarDict["BROKER_SHUT_DOWN_COMPLETED_MSG"] in line:
+                logger.info("found the log line : " + line, extra=d)
+                try:
+                    matchObj    = re.match(testcaseEnv.userDefinedEnvVarDict["REGX_BROKER_SHUT_DOWN_COMPLETED_PATTERN"], line)
+                    datetimeStr = matchObj.group(1)
+                    datetimeObj = datetime.strptime(datetimeStr, "%Y-%m-%d %H:%M:%S,%f")
+                    unixTs = time.mktime(datetimeObj.timetuple()) + 1e-6*datetimeObj.microsecond
+                    #print "{0:.3f}".format(unixTs)
+                    shutdownBrokerDict["timestamp"] = unixTs
+                    shutdownBrokerDict["brokerid"]  = matchObj.group(2)
+                    logger.info("brokerid: [" + shutdownBrokerDict["brokerid"] + "] entity_id: [" + shutdownBrokerDict["entity_id"] + "]", extra=d)
+                    return shutdownBrokerDict
+                except:
+                    logger.error("ERROR [unable to find matching leader details: Has the matching pattern changed?]", extra=d)
+                    raise
+            #else:
+            #    logger.debug("unmatched line found [" + line + "]", extra=d)
+
+    return shutdownBrokerDict
+
+
 def get_leader_elected_log_line(systemTestEnv, testcaseEnv):
 
     logger.info("looking up leader...", extra=d)
@@ -327,9 +409,6 @@ def get_leader_elected_log_line(systemTe
         logFile    = system_test_utils.get_data_by_lookup_keyval( \
                          testcaseEnv.testcaseConfigsList, "entity_id", brokerEntityId, "log_filename")
 
-        leaderDict["entity_id"] = brokerEntityId
-        leaderDict["hostname"]  = hostname
-
         logPathName = get_testcase_config_log_dir_pathname(testcaseEnv, "broker", brokerEntityId, "default")
         cmdStrList = ["ssh " + hostname,
                       "\"grep -i -h '" + testcaseEnv.userDefinedEnvVarDict["LEADER_ELECTION_COMPLETED_MSG"] + "' ",
@@ -351,12 +430,21 @@ def get_leader_elected_log_line(systemTe
                     datetimeObj = datetime.strptime(datetimeStr, "%Y-%m-%d %H:%M:%S,%f")
                     unixTs = time.mktime(datetimeObj.timetuple()) + 1e-6*datetimeObj.microsecond
                     #print "{0:.3f}".format(unixTs)
-                    leaderDict["timestamp"] = unixTs
-                    leaderDict["brokerid"]  = matchObj.group(2)
-                    leaderDict["topic"]     = matchObj.group(3)
-                    leaderDict["partition"] = matchObj.group(4)
+
+                    # update leaderDict when
+                    # 1. leaderDict has no logline entry
+                    # 2. leaderDict has existing logline entry but found another logline with more recent timestamp
+                    if (len(leaderDict) > 0 and leaderDict["timestamp"] < unixTs) or (len(leaderDict) == 0):
+                        leaderDict["timestamp"] = unixTs
+                        leaderDict["brokerid"]  = matchObj.group(2)
+                        leaderDict["topic"]     = matchObj.group(3)
+                        leaderDict["partition"] = matchObj.group(4)
+                        leaderDict["entity_id"] = brokerEntityId
+                        leaderDict["hostname"]  = hostname
+                    logger.info("brokerid: [" + leaderDict["brokerid"] + "] entity_id: [" + leaderDict["entity_id"] + "]", extra=d)
                 except:
                     logger.error("ERROR [unable to find matching leader details: Has the matching pattern changed?]", extra=d)
+                    raise
             #else:
             #    logger.debug("unmatched line found [" + line + "]", extra=d)
 
@@ -394,7 +482,6 @@ def start_entity_in_background(systemTes
                   logPathName + "/" + logFile + " & echo pid:$! > ",
                   logPathName + "/entity_" + entityId + "_pid'"]
 
-
         # construct zk.connect str and update it to testcaseEnv.userDefinedEnvVarDict.zkConnectStr
         if ( len(testcaseEnv.userDefinedEnvVarDict["zkConnectStr"]) > 0 ):
             testcaseEnv.userDefinedEnvVarDict["zkConnectStr"] = \
@@ -407,14 +494,10 @@ def start_entity_in_background(systemTes
                   "'JAVA_HOME=" + javaHome,
                  "JMX_PORT=" + jmxPort,
                   kafkaHome + "/bin/kafka-run-class.sh kafka.Kafka",
-                  configPathName + "/" + configFile + " &> ",
+                  configPathName + "/" + configFile + " >> ",
                   logPathName + "/" + logFile + " & echo pid:$! > ",
                   logPathName + "/entity_" + entityId + "_pid'"]
 
-    # it seems to be a better idea to launch producer & consumer in separate functions
-    # elif role == "producer_performance":
-    # elif role == "console_consumer":
-
     cmdStr = " ".join(cmdList)
 
     logger.debug("executing command: [" + cmdStr + "]", extra=d)
@@ -432,6 +515,7 @@ def start_entity_in_background(systemTes
             logger.debug("found pid line: [" + line + "]", extra=d)
             tokens = line.split(':')
             testcaseEnv.entityParentPidDict[entityId] = tokens[1]
+            #print "\n#### testcaseEnv.entityParentPidDict ", testcaseEnv.entityParentPidDict, "\n"
 
     time.sleep(1)
     metrics.start_metrics_collection(hostname, jmxPort, role, entityId, systemTestEnv, testcaseEnv)
@@ -451,12 +535,14 @@ def start_console_consumer(systemTestEnv
                                 clusterEntityConfigDictList, "entity_id", entityId, "kafka_home")
         javaHome          = system_test_utils.get_data_by_lookup_keyval( \
                                 clusterEntityConfigDictList, "entity_id", entityId, "java_home")
+        jmxPort           = system_test_utils.get_data_by_lookup_keyval( \
+                                clusterEntityConfigDictList, "entity_id", entityId, "jmx_port")
         kafkaRunClassBin  = kafkaHome + "/bin/kafka-run-class.sh"
 
         logger.info("starting console consumer", extra=d)
 
-        consumerLogPathName = get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer", entityId, "default") + \
-                                  "/console_consumer.log"
+        consumerLogPath     = get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer", entityId, "default")
+        consumerLogPathName = consumerLogPath + "/console_consumer.log"
 
         testcaseEnv.userDefinedEnvVarDict["consumerLogPathName"] = consumerLogPathName
 
@@ -465,18 +551,48 @@ def start_console_consumer(systemTestEnv
                    "'JAVA_HOME=" + javaHome,
                    "JMX_PORT=" + jmxPort,
                    kafkaRunClassBin + " kafka.consumer.ConsoleConsumer",
-                   commandArgs + " &> " + consumerLogPathName + "'"]
+                   commandArgs + " &> " + consumerLogPathName,
+                   " & echo pid:$! > " + consumerLogPath + "/entity_" + entityId + "_pid'"]
 
         cmdStr = " ".join(cmdList)
+
         logger.debug("executing command: [" + cmdStr + "]", extra=d)
         system_test_utils.async_sys_call(cmdStr)
         time.sleep(2)
         metrics.start_metrics_collection(host, jmxPort, role, entityId, systemTestEnv, testcaseEnv)
 
+        pidCmdStr = "ssh " + host + " 'cat " + consumerLogPath + "/entity_" + entityId + "_pid'"
+        logger.debug("executing command: [" + pidCmdStr + "]", extra=d)
+        subproc = system_test_utils.sys_call_return_subproc(pidCmdStr)
+
+        # keep track of the remote entity pid in a dictionary
+        for line in subproc.stdout.readlines():
+            if line.startswith("pid"):
+                line = line.rstrip('\n')
+                logger.debug("found pid line: [" + line + "]", extra=d)
+                tokens = line.split(':')
+                testcaseEnv.consumerHostParentPidDict[host] = tokens[1]
+
 
 def start_producer_performance(systemTestEnv, testcaseEnv):
 
     clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
+    testcaseConfigsList         = testcaseEnv.testcaseConfigsList
+    brokerListStr = ""
+
+    # construct "broker-list" for producer
+    for clusterEntityConfigDict in clusterEntityConfigDictList:
+        entityRole = clusterEntityConfigDict["role"]
+        if entityRole == "broker":
+            hostname = clusterEntityConfigDict["hostname"]
+            entityId = clusterEntityConfigDict["entity_id"]
+            port     = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "port")
+
+            if len(brokerListStr) == 0:
+                brokerListStr = hostname + ":" + port
+            else:
+                brokerListStr = brokerListStr + "," + hostname + ":" + port
+ 
 
     producerConfigList = system_test_utils.get_dict_from_list_of_dicts( \
                              clusterEntityConfigDictList, "role", "producer_performance")
@@ -489,12 +605,14 @@ def start_producer_performance(systemTes
                                 clusterEntityConfigDictList, "entity_id", entityId, "kafka_home")
         javaHome          = system_test_utils.get_data_by_lookup_keyval( \
                                 clusterEntityConfigDictList, "entity_id", entityId, "java_home")
+        jmxPort           = system_test_utils.get_data_by_lookup_keyval( \
+                                clusterEntityConfigDictList, "entity_id", entityId, "jmx_port")
         kafkaRunClassBin  = kafkaHome + "/bin/kafka-run-class.sh"
 
         logger.info("starting producer preformance", extra=d)
 
-        producerLogPathName = get_testcase_config_log_dir_pathname(testcaseEnv, "producer_performance", entityId, "default") + \
-                                  "/producer_performance.log"
+        producerLogPath     = get_testcase_config_log_dir_pathname(testcaseEnv, "producer_performance", entityId, "default")
+        producerLogPathName = producerLogPath + "/producer_performance.log"
 
         testcaseEnv.userDefinedEnvVarDict["producerLogPathName"] = producerLogPathName
 
@@ -503,7 +621,9 @@ def start_producer_performance(systemTes
                    "'JAVA_HOME=" + javaHome,
                    "JMX_PORT=" + jmxPort,
                    kafkaRunClassBin + " kafka.perf.ProducerPerformance",
-                   commandArgs + " &> " + producerLogPathName + "'"]
+                   "--broker-list " +brokerListStr,
+                   commandArgs + " &> " + producerLogPathName,
+                   " & echo pid:$! > " + producerLogPath + "/entity_" + entityId + "_pid'"]
 
         cmdStr = " ".join(cmdList)
         logger.debug("executing command: [" + cmdStr + "]", extra=d)
@@ -511,6 +631,18 @@ def start_producer_performance(systemTes
         time.sleep(1)
         metrics.start_metrics_collection(host, jmxPort, role, entityId, systemTestEnv, testcaseEnv)
 
+        pidCmdStr = "ssh " + host + " 'cat " + producerLogPath + "/entity_" + entityId + "_pid'"
+        logger.debug("executing command: [" + pidCmdStr + "]", extra=d)
+        subproc = system_test_utils.sys_call_return_subproc(pidCmdStr)
+
+        # keep track of the remote entity pid in a dictionary
+        for line in subproc.stdout.readlines():
+            if line.startswith("pid"):
+                line = line.rstrip('\n')
+                logger.debug("found pid line: [" + line + "]", extra=d)
+                tokens = line.split(':')
+                testcaseEnv.producerHostParentPidDict[host] = tokens[1]
+
 
 def stop_remote_entity(systemTestEnv, entityId, parentPid):
     clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
@@ -519,28 +651,32 @@ def stop_remote_entity(systemTestEnv, en
     pidStack  = system_test_utils.get_remote_child_processes(hostname, parentPid)
 
     logger.info("terminating process id: " + parentPid + " in host: " + hostname, extra=d)
-#    system_test_utils.sigterm_remote_process(hostname, pidStack)
+    system_test_utils.sigterm_remote_process(hostname, pidStack)
 #    time.sleep(1)
+#    system_test_utils.sigkill_remote_process(hostname, pidStack)
+
+
+def force_stop_remote_entity(systemTestEnv, entityId, parentPid):
+    clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
+
+    hostname  = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "hostname")
+    pidStack  = system_test_utils.get_remote_child_processes(hostname, parentPid)
+
+    logger.info("terminating process id: " + parentPid + " in host: " + hostname, extra=d)
     system_test_utils.sigkill_remote_process(hostname, pidStack)
 
 
 def create_topic(systemTestEnv, testcaseEnv):
     clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
 
-    prodPerfCfgList = system_test_utils.get_dict_from_list_of_dicts( \
-                          clusterEntityConfigDictList, "role", "producer_performance")
-    prodPerfCfgDict = system_test_utils.get_dict_from_list_of_dicts( \
-                          testcaseEnv.testcaseConfigsList, "entity_id", prodPerfCfgList[0]["entity_id"])
+    prodPerfCfgList = system_test_utils.get_dict_from_list_of_dicts(clusterEntityConfigDictList, "role", "producer_performance")
+    prodPerfCfgDict = system_test_utils.get_dict_from_list_of_dicts(testcaseEnv.testcaseConfigsList, "entity_id", prodPerfCfgList[0]["entity_id"])
     prodTopicList   = prodPerfCfgDict[0]["topic"].split(',')
 
-    zkEntityId      = system_test_utils.get_data_by_lookup_keyval( \
-                          clusterEntityConfigDictList, "role", "zookeeper", "entity_id")
-    zkHost          = system_test_utils.get_data_by_lookup_keyval( \
-                          clusterEntityConfigDictList, "role", "zookeeper", "hostname")
-    kafkaHome       = system_test_utils.get_data_by_lookup_keyval( \
-                          clusterEntityConfigDictList, "entity_id", zkEntityId, "kafka_home")
-    javaHome        = system_test_utils.get_data_by_lookup_keyval( \
-                          clusterEntityConfigDictList, "entity_id", zkEntityId, "java_home")
+    zkEntityId      = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "role", "zookeeper", "entity_id")
+    zkHost          = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "role", "zookeeper", "hostname")
+    kafkaHome       = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", zkEntityId, "kafka_home")
+    javaHome        = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", zkEntityId, "java_home")
     createTopicBin  = kafkaHome + "/bin/kafka-create-topic.sh"
 
     logger.info("zkEntityId : " + zkEntityId, extra=d)
@@ -628,6 +764,8 @@ def validate_leader_election_successful(
         except Exception, e:
             logger.error("leader info not completed: {0}".format(e), extra=d)
             traceback.print_exc()
+            print leaderDict
+            traceback.print_exc()
             validationStatusDict["Validate leader election successful"] = "FAILED"
             return False
     else:
@@ -645,7 +783,8 @@ def cleanup_data_at_remote_hosts(systemT
         hostname         = clusterEntityConfigDict["hostname"]
         entityId         = clusterEntityConfigDict["entity_id"]
         role             = clusterEntityConfigDict["role"]
-        #testcasePathName = testcaseEnv.testcaseBaseDir
+        kafkaHome        = clusterEntityConfigDict["kafka_home"]
+        testCaseBaseDir  = testcaseEnv.testCaseBaseDir
         cmdStr           = ""
         dataDir          = ""
 
@@ -667,17 +806,102 @@ def cleanup_data_at_remote_hosts(systemT
             logger.warn("aborting test...", extra=d)
             sys.exit(1)
 
+        # ============================
+        # cleaning data dir
+        # ============================
         logger.debug("executing command [" + cmdStr + "]", extra=d)
         system_test_utils.sys_call(cmdStr)
 
+        # ============================
+        # cleaning log/metrics/svg, ...
+        # ============================
+        if system_test_utils.remote_host_file_exists(hostname, kafkaHome + "/bin/kafka-run-class.sh"):
+            # so kafkaHome is a real kafka installation
+            cmdStr = "ssh " + hostname + " \"find " + testCaseBaseDir + " -name '*.log' | xargs rm 2> /dev/null\""
+            logger.debug("executing command [" + cmdStr + "]", extra=d)
+            system_test_utils.sys_call(cmdStr)
+
+            cmdStr = "ssh " + hostname + " \"find " + testCaseBaseDir + " -name '*_pid' | xargs rm 2> /dev/null\""
+            logger.debug("executing command [" + cmdStr + "]", extra=d)
+            system_test_utils.sys_call(cmdStr)
+
+            cmdStr = "ssh " + hostname + " \"find " + testCaseBaseDir + " -name '*.csv' | xargs rm 2> /dev/null\""
+            logger.debug("executing command [" + cmdStr + "]", extra=d)
+            system_test_utils.sys_call(cmdStr)
+
+            cmdStr = "ssh " + hostname + " \"find " + testCaseBaseDir + " -name '*.svg' | xargs rm 2> /dev/null\""
+            logger.debug("executing command [" + cmdStr + "]", extra=d)
+            system_test_utils.sys_call(cmdStr)
+
+            cmdStr = "ssh " + hostname + " \"find " + testCaseBaseDir + " -name '*.html' | xargs rm 2> /dev/null\""
+            logger.debug("executing command [" + cmdStr + "]", extra=d)
+            system_test_utils.sys_call(cmdStr)
+
 def get_entity_log_directory(testCaseBaseDir, entity_id, role):
     return testCaseBaseDir + "/logs/" + role + "-" + entity_id
 
 def get_entities_for_role(clusterConfig, role):
     return filter(lambda entity: entity['role'] == role, clusterConfig)
-    
+
 def stop_consumer():
     system_test_utils.sys_call("ps -ef | grep ConsoleConsumer | grep -v grep | tr -s ' ' | cut -f2 -d' ' | xargs kill -15")
 
-def stop_producer():
-    system_test_utils.sys_call("ps -ef | grep ProducerPerformance | grep -v grep | tr -s ' ' | cut -f2 -d' ' | xargs kill -15")
+def ps_grep_terminate_running_entity(systemTestEnv):
+    clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
+    username = getpass.getuser()
+
+    for clusterEntityConfigDict in systemTestEnv.clusterEntityConfigDictList:
+        hostname = clusterEntityConfigDict["hostname"]
+        cmdList  = ["ssh " + hostname,
+                    "\"ps auxw | grep -v grep | grep -v Bootstrap | grep -v vim | grep ^" + username,
+                    "| grep -i 'java\|server\-start\|run\-\|producer\|consumer\|jmxtool' | grep kafka",
+                    "| tr -s ' ' | cut -f2 -d ' ' | xargs kill -9" + "\""]
+
+        cmdStr = " ".join(cmdList)
+        logger.debug("executing command [" + cmdStr + "]", extra=d)
+
+        system_test_utils.sys_call(cmdStr) 
+
+
+def get_reelection_latency(systemTestEnv, testcaseEnv, leaderDict):
+    leaderEntityId = None
+    leaderBrokerId = None
+    leaderPPid     = None
+    shutdownLeaderTimestamp = None
+
+    if testcaseEnv.validationStatusDict["Validate leader election successful"] == "FAILED":
+        # leader election is not successful - something is wrong => so skip this testcase
+        #continue
+        return None
+    else:
+        # leader elected => stop leader
+        try:
+            leaderEntityId = leaderDict["entity_id"]
+            leaderBrokerId = leaderDict["brokerid"]
+            leaderPPid     = testcaseEnv.entityParentPidDict[leaderEntityId]
+        except:
+            logger.info("leader details unavailable", extra=d)
+            raise
+
+        logger.info("stopping leader in entity "+leaderEntityId+" with pid "+leaderPPid, extra=d)
+        stop_remote_entity(systemTestEnv, leaderEntityId, leaderPPid)
+
+    logger.info("sleeping for 5s for leader re-election to complete", extra=d)
+    time.sleep(5)
+
+    # get broker shut down completed timestamp
+    shutdownBrokerDict = get_broker_shutdown_log_line(systemTestEnv, testcaseEnv)
+    #print shutdownBrokerDict
+    logger.info("unix timestamp of shut down completed: " + str("{0:.6f}".format(shutdownBrokerDict["timestamp"])), extra=d)
+
+    logger.info("looking up new leader", extra=d)
+    leaderDict2 = get_leader_elected_log_line(systemTestEnv, testcaseEnv)
+    #print leaderDict2
+    logger.info("unix timestamp of new elected leader: " + str("{0:.6f}".format(leaderDict2["timestamp"])), extra=d)
+    leaderReElectionLatency = float(leaderDict2["timestamp"]) - float(shutdownBrokerDict["timestamp"])
+    logger.info("leader Re-election Latency: " + str(leaderReElectionLatency) + " sec", extra=d)
+    testcaseEnv.validationStatusDict["Leader Election Latency"] = str("{0:.2f}".format(leaderReElectionLatency * 1000)) + " ms"
+ 
+    return leaderReElectionLatency
+
+

Modified: incubator/kafka/branches/0.8/system_test/utils/metrics.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/utils/metrics.py?rev=1379232&r1=1379231&r2=1379232&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/utils/metrics.py (original)
+++ incubator/kafka/branches/0.8/system_test/utils/metrics.py Thu Aug 30 23:47:55 2012
@@ -31,6 +31,8 @@ import traceback
 
 import csv
 import time 
+import matplotlib as mpl
+mpl.use('Agg')
 import matplotlib.pyplot as plt
 from collections import namedtuple
 import numpy
@@ -78,6 +80,8 @@ def ensure_valid_headers(headers, attrib
         attributeColumnIndex = headers.index(attributes)
         return attributeColumnIndex
     except ValueError as ve:
+        #print "#### attributes : ", attributes
+        #print "#### headers    : ", headers
         raise Exception("There should be exactly one column that matches attribute: {0} in".format(attributes) +  
                         " headers: {0}".format(",".join(headers)))
         
@@ -119,6 +123,7 @@ def plot_graphs(inputCsvFiles, labels, t
             plots.append(p1)
         except Exception as e:
             logger.error("ERROR while plotting data for {0}: {1}".format(inputCsvFile, e), extra=d)
+            traceback.print_exc()
     # find xmin, xmax, ymin, ymax from all csv files
     xmin = min(map(lambda coord: coord.x, coordinates))
     xmax = max(map(lambda coord: coord.x, coordinates))
@@ -172,6 +177,7 @@ def draw_graph_for_role(graphs, entities
 #            print "Finished plotting graph for metric {0} on entity {1}".format(graph['graph_name'], entity['entity_id'])
         except Exception as e:
             logger.error("ERROR while plotting graph {0}: {1}".format(outputGraphFile, e), extra=d)
+            traceback.print_exc()
 
 def build_all_dashboards(metricsDefinitionFile, testcaseDashboardsDir, clusterConfig):
     metricsHtmlFile = testcaseDashboardsDir + "/metrics.html"
@@ -231,19 +237,29 @@ def start_metrics_collection(jmxHost, jm
         startMetricsCommand = " ".join(startMetricsCmdList) 
         logger.debug("executing command: [" + startMetricsCommand + "]", extra=d)
         system_test_utils.async_sys_call(startMetricsCommand)
-            
-        pidCmdStr = "ssh " + jmxHost + " 'cat " + entityMetricsDir + "/entity_pid'"
+        time.sleep(1)
+
+        pidCmdStr = "ssh " + jmxHost + " 'cat " + entityMetricsDir + "/entity_pid 2> /dev/null'"
         logger.debug("executing command: [" + pidCmdStr + "]", extra=d)
         subproc = system_test_utils.sys_call_return_subproc(pidCmdStr)
 
-            # keep track of the remote entity pid in a dictionary
+        # keep track of JMX ppid in a dictionary of entity_id to list of JMX ppid
+        # testcaseEnv.entityJmxParentPidDict:
+        #   key: entity_id
+        #   val: list of JMX ppid associated to that entity_id
+        #   { 1: [1234, 1235, 1236], 2: [2234, 2235, 2236], ... }
         for line in subproc.stdout.readlines():
+            line = line.rstrip('\n')
+            logger.debug("line: [" + line + "]", extra=d)
             if line.startswith("pid"):
-                line = line.rstrip('\n')
                 logger.debug("found pid line: [" + line + "]", extra=d)
                 tokens  = line.split(':')
                 thisPid = tokens[1]
-                testcaseEnv.entityParentPidDict[thisPid] = thisPid
+                if entityId not in testcaseEnv.entityJmxParentPidDict:
+                    testcaseEnv.entityJmxParentPidDict[entityId] = []
+                testcaseEnv.entityJmxParentPidDict[entityId].append(thisPid)
+                #print "\n#### testcaseEnv.entityJmxParentPidDict ", testcaseEnv.entityJmxParentPidDict, "\n"
+
 
 def stop_metrics_collection(jmxHost, jmxPort):
     logger.info("stopping metrics collection on " + jmxHost + ":" + jmxPort, extra=d)

Modified: incubator/kafka/branches/0.8/system_test/utils/system_test_utils.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/utils/system_test_utils.py?rev=1379232&r1=1379231&r2=1379232&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/utils/system_test_utils.py (original)
+++ incubator/kafka/branches/0.8/system_test/utils/system_test_utils.py Thu Aug 30 23:47:55 2012
@@ -24,7 +24,9 @@ import inspect
 import json
 import logging
 import os
+import re
 import signal
+import socket
 import subprocess
 import sys
 import time
@@ -34,6 +36,15 @@ thisClassName = '(system_test_utils)'
 d = {'name_of_class': thisClassName}
 
 
+def get_current_unix_timestamp():
+    ts = time.time()
+    return "{0:.6f}".format(ts)
+
+
+def get_local_hostname():
+    return socket.gethostname()
+
+
 def sys_call(cmdStr):
     output = ""
     #logger.info("executing command [" + cmdStr + "]", extra=d)
@@ -218,6 +229,7 @@ def sigterm_remote_process(hostname, pid
             sys_call_return_subproc(cmdStr)
         except:
             print "WARN - pid:",pid,"not found"
+            raise
 
 
 def sigkill_remote_process(hostname, pidStack):
@@ -231,6 +243,7 @@ def sigkill_remote_process(hostname, pid
             sys_call_return_subproc(cmdStr)
         except:
             print "WARN - pid:",pid,"not found"
+            raise
 
 
 def terminate_process(pidStack):
@@ -240,6 +253,7 @@ def terminate_process(pidStack):
             os.kill(int(pid), signal.SIGTERM)
         except:
             print "WARN - pid:",pid,"not found"
+            raise
 
 
 def convert_keyval_to_cmd_args(configFilePathname):
@@ -267,6 +281,17 @@ def sys_call_return_subproc(cmd_str):
     return p
 
 
+def remote_host_file_exists(hostname, pathname):
+    cmdStr = "ssh " + hostname + " 'ls " + pathname + "'"
+    logger.debug("executing command: [" + cmdStr + "]", extra=d)
+    subproc = sys_call_return_subproc(cmdStr)
+
+    for line in subproc.stdout.readlines():
+        if "No such file or directory" in line:
+            return False
+    return True
+
+
 def remote_host_directory_exists(hostname, path):
     cmdStr = "ssh " + hostname + " 'ls -d " + path + "'"
     logger.debug("executing command: [" + cmdStr + "]", extra=d)
@@ -296,24 +321,39 @@ def remote_host_processes_stopped(hostna
 def setup_remote_hosts(systemTestEnv):
     clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
 
+    localKafkaHome = os.path.abspath(systemTestEnv.SYSTEM_TEST_BASE_DIR + "/..")
+    localJavaBin   = ""
+    localJavaHome  = ""
+
+    subproc = sys_call_return_subproc("which java")
+    for line in subproc.stdout.readlines():
+        if line.startswith("which: no "):
+            logger.error("No Java binary found in local host", extra=d)
+            return False
+        else:
+            line = line.rstrip('\n')
+            localJavaBin = line
+            matchObj = re.match("(.*)\/bin\/java$", line)
+            localJavaHome = matchObj.group(1)
+
+    listIndex = -1
     for clusterEntityConfigDict in clusterEntityConfigDictList:
+        listIndex += 1
+
         hostname  = clusterEntityConfigDict["hostname"]
         kafkaHome = clusterEntityConfigDict["kafka_home"]
         javaHome  = clusterEntityConfigDict["java_home"]
 
-        localKafkaHome = os.path.abspath(systemTestEnv.SYSTEM_TEST_BASE_DIR + "/..")
-        logger.info("local kafka home : [" + localKafkaHome + "]", extra=d)
-        if kafkaHome != localKafkaHome:
-            logger.error("kafkaHome [" + kafkaHome + "] must be the same as [" + localKafkaHome + "] in host [" + hostname + "]", extra=d)
-            logger.error("please update cluster_config.json and run again. Aborting test ...", extra=d)
-            sys.exit(1)
-
-        #logger.info("checking running processes in host [" + hostname + "]", extra=d)
-        #if not remote_host_processes_stopped(hostname):
-        #    logger.error("Running processes found in host [" + hostname + "]", extra=d)
-        #    return False
+        if hostname == "localhost" and javaHome == "default":
+            clusterEntityConfigDictList[listIndex]["java_home"] = localJavaHome
+
+        if hostname == "localhost" and kafkaHome == "default":
+            clusterEntityConfigDictList[listIndex]["kafka_home"] = localKafkaHome
+
+        kafkaHome = clusterEntityConfigDict["kafka_home"]
+        javaHome  = clusterEntityConfigDict["java_home"]
 
-        logger.info("checking JAVA_HOME [" + javaHome + "] in host [" + hostname + "]", extra=d)
+        logger.info("checking java binary [" + localJavaBin + "] in host [" + hostname + "]", extra=d)
         if not remote_host_directory_exists(hostname, javaHome):
             logger.error("Directory not found: [" + javaHome + "] in host [" + hostname + "]", extra=d)
             return False
@@ -339,3 +379,22 @@ def copy_source_to_remote_hosts(hostname
     for line in subproc.stdout.readlines():
         dummyVar = 1
 
+
+def remove_kafka_home_dir_at_remote_hosts(hostname, kafkaHome):
+
+    if remote_host_file_exists(hostname, kafkaHome + "/bin/kafka-run-class.sh"):
+        cmdStr  = "ssh " + hostname + " 'chmod -R 777 " + kafkaHome + "'"
+        logger.info("executing command [" + cmdStr + "]", extra=d)
+        system_test_utils.sys_call(cmdStr)
+
+        cmdStr  = "ssh " + hostname + " 'rm -r " + kafkaHome + "'"
+        logger.info("executing command [" + cmdStr + "]", extra=d)
+        #system_test_utils.sys_call(cmdStr)
+    else:
+        logger.warn("possible destructive command [" + cmdStr + "]", extra=d)
+        logger.warn("check config file: system_test/cluster_config.properties", extra=d)
+        logger.warn("aborting test...", extra=d)
+        sys.exit(1)
+
+
+

Modified: incubator/kafka/branches/0.8/system_test/utils/testcase_env.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/utils/testcase_env.py?rev=1379232&r1=1379231&r2=1379232&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/utils/testcase_env.py (original)
+++ incubator/kafka/branches/0.8/system_test/utils/testcase_env.py Thu Aug 30 23:47:55 2012
@@ -30,9 +30,30 @@ class TestcaseEnv():
     # Generic testcase environment
     # ================================
 
-    # dictionary of entity parent pid
+    # dictionary of entity_id to ppid for entities such as zookeepers & brokers
+    # key: entity_id
+    # val: ppid of zk or broker associated to that entity_id
+    # { 0: 12345, 1: 12389, ... }
     entityParentPidDict = {}
 
+    # dictionary of entity_id to list of JMX ppid
+    # key: entity_id
+    # val: list of JMX ppid associated to that entity_id
+    # { 1: [1234, 1235, 1236], 2: [2234, 2235, 2236], ... }
+    entityJmxParentPidDict = {}
+
+    # dictionary of hostname-topic-ppid for consumer
+    # key: hostname
+    # val: dict of topic-ppid
+    # { host1: { test1 : 12345 }, host1: { test2 : 12389 }, ... }
+    consumerHostParentPidDict = {}
+
+    # dictionary of hostname-topic-ppid for producer
+    # key: hostname
+    # val: dict of topic-ppid
+    # { host1: { test1 : 12345 }, host1: { test2 : 12389 }, ... }
+    producerHostParentPidDict = {}
+
     # list of testcase configs
     testcaseConfigsList = []