You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2012/08/31 01:47:56 UTC
svn commit: r1379232 - in /incubator/kafka/branches/0.8/system_test: ./
replication_testsuite/ replication_testsuite/config/
replication_testsuite/testcase_1/ utils/
Author: nehanarkhede
Date: Thu Aug 30 23:47:55 2012
New Revision: 1379232
URL: http://svn.apache.org/viewvc?rev=1379232&view=rev
Log:
KAFKA-483 Improvements to the system testing framework; patched by John Fung; reviewed by Neha Narkhede
Modified:
incubator/kafka/branches/0.8/system_test/README.txt
incubator/kafka/branches/0.8/system_test/cluster_config.json
incubator/kafka/branches/0.8/system_test/replication_testsuite/config/producer_performance.properties
incubator/kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py
incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_1/testcase_1_properties.json
incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py
incubator/kafka/branches/0.8/system_test/utils/metrics.py
incubator/kafka/branches/0.8/system_test/utils/system_test_utils.py
incubator/kafka/branches/0.8/system_test/utils/testcase_env.py
Modified: incubator/kafka/branches/0.8/system_test/README.txt
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/README.txt?rev=1379232&r1=1379231&r2=1379232&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/README.txt (original)
+++ incubator/kafka/branches/0.8/system_test/README.txt Thu Aug 30 23:47:55 2012
@@ -1,9 +1,7 @@
# ==========================
# Known Issues:
# ==========================
-1. The "broker-list" in system_test/replication_testsuite/testcase_1/testcase_1_properties.json needs to be manually updated to the proper settings of your environment. (If this test is to be run in a single localhost, no change is required for this.)
-2. Sometimes the running processes may not be terminated properly by the script.
-
+1. This test framework currently doesn't support MacOS due to different "ps" argument options from Linux. The correct ps execution is required to terminate the background running processes properly.
# ==========================
# Overview
@@ -57,7 +55,8 @@ The framework has the following levels:
2. Edit system_test/replication_testsuite/testcase_1/testcase_1_properties.json and update "broker-list" to the proper settings of your environment. (If this test is to be run in a single localhost, no change is required for this.)
3. To run the test, go to <kafka_home>/system_test and run the following command:
$ python -B system_test_runner.py
-
+ 4. To turn on debugging, update system_test/system_test_runner.py and uncomment the following line:
+ namedLogger.setLevel(logging.DEBUG)
# ==========================
# Adding Test Case
Modified: incubator/kafka/branches/0.8/system_test/cluster_config.json
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/cluster_config.json?rev=1379232&r1=1379231&r2=1379232&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/cluster_config.json (original)
+++ incubator/kafka/branches/0.8/system_test/cluster_config.json Thu Aug 30 23:47:55 2012
@@ -4,48 +4,48 @@
"entity_id": "0",
"hostname": "localhost",
"role": "zookeeper",
- "kafka_home": "/home/nnarkhed/Projects/kafka-440-with-metrics",
- "java_home": "/export/apps/jdk/JDK-1_6_0_27",
+ "kafka_home": "default",
+ "java_home": "default",
"jmx_port": "9990"
},
{
"entity_id": "1",
"hostname": "localhost",
"role": "broker",
- "kafka_home": "/home/nnarkhed/Projects/kafka-440-with-metrics",
- "java_home": "/export/apps/jdk/JDK-1_6_0_27",
+ "kafka_home": "default",
+ "java_home": "default",
"jmx_port": "9991"
},
{
"entity_id": "2",
"hostname": "localhost",
"role": "broker",
- "kafka_home": "/home/nnarkhed/Projects/kafka-440-with-metrics",
- "java_home": "/export/apps/jdk/JDK-1_6_0_27",
+ "kafka_home": "default",
+ "java_home": "default",
"jmx_port": "9992"
},
{
"entity_id": "3",
"hostname": "localhost",
"role": "broker",
- "kafka_home": "/home/nnarkhed/Projects/kafka-440-with-metrics",
- "java_home": "/export/apps/jdk/JDK-1_6_0_27",
+ "kafka_home": "default",
+ "java_home": "default",
"jmx_port": "9993"
},
{
"entity_id": "4",
"hostname": "localhost",
"role": "producer_performance",
- "kafka_home": "/home/nnarkhed/Projects/kafka-440-with-metrics",
- "java_home": "/export/apps/jdk/JDK-1_6_0_27",
+ "kafka_home": "default",
+ "java_home": "default",
"jmx_port": "9994"
},
{
"entity_id": "5",
"hostname": "localhost",
"role": "console_consumer",
- "kafka_home": "/home/nnarkhed/Projects/kafka-440-with-metrics",
- "java_home": "/export/apps/jdk/JDK-1_6_0_27",
+ "kafka_home": "default",
+ "java_home": "default",
"jmx_port": "9995"
}
]
Modified: incubator/kafka/branches/0.8/system_test/replication_testsuite/config/producer_performance.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/replication_testsuite/config/producer_performance.properties?rev=1379232&r1=1379231&r2=1379232&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/replication_testsuite/config/producer_performance.properties (original)
+++ incubator/kafka/branches/0.8/system_test/replication_testsuite/config/producer_performance.properties Thu Aug 30 23:47:55 2012
@@ -1,4 +1,3 @@
-broker-list=localhost:2181
topic=mytest
messages=200
message-size=100
Modified: incubator/kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py?rev=1379232&r1=1379231&r2=1379232&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py (original)
+++ incubator/kafka/branches/0.8/system_test/replication_testsuite/replica_basic_test.py Thu Aug 30 23:47:55 2012
@@ -44,6 +44,7 @@ class ReplicaBasicTest(SetupUtils):
testModuleAbsPathName = os.path.realpath(__file__)
testSuiteAbsPathName = os.path.abspath(os.path.dirname(testModuleAbsPathName))
isLeaderLogPattern = "Completed the leader state transition"
+ brokerShutDownCompletedPattern = "shut down completed"
def __init__(self, systemTestEnv):
@@ -82,12 +83,16 @@ class ReplicaBasicTest(SetupUtils):
self.testcaseEnv.testSuiteBaseDir = self.testSuiteAbsPathName
# initialize self.testcaseEnv with user-defined environment
- self.testcaseEnv.userDefinedEnvVarDict["LEADER_ELECTION_COMPLETED_MSG"] = \
- ReplicaBasicTest.isLeaderLogPattern
+ self.testcaseEnv.userDefinedEnvVarDict["BROKER_SHUT_DOWN_COMPLETED_MSG"] = ReplicaBasicTest.brokerShutDownCompletedPattern
+ self.testcaseEnv.userDefinedEnvVarDict["REGX_BROKER_SHUT_DOWN_COMPLETED_PATTERN"] = \
+ "\[(.*?)\] .* \[Kafka Server (.*?)\], " + ReplicaBasicTest.brokerShutDownCompletedPattern
+
+ self.testcaseEnv.userDefinedEnvVarDict["LEADER_ELECTION_COMPLETED_MSG"] = ReplicaBasicTest.isLeaderLogPattern
self.testcaseEnv.userDefinedEnvVarDict["REGX_LEADER_ELECTION_PATTERN"] = \
"\[(.*?)\] .* Broker (.*?): " + \
self.testcaseEnv.userDefinedEnvVarDict["LEADER_ELECTION_COMPLETED_MSG"] + \
" for topic (.*?) partition (.*?) \(.*"
+
self.testcaseEnv.userDefinedEnvVarDict["zkConnectStr"] = ""
# find testcase properties json file
@@ -100,7 +105,7 @@ class ReplicaBasicTest(SetupUtils):
testcaseDirName = os.path.basename(testCasePathName)
self.testcaseEnv.testcaseResultsDict["test_case_name"] = testcaseDirName
- #### => update testcaseEnv
+ # update testcaseEnv
self.testcaseEnv.testCaseBaseDir = testCasePathName
self.testcaseEnv.testCaseLogsDir = self.testcaseEnv.testCaseBaseDir + "/logs"
self.testcaseEnv.testCaseDashboardsDir = self.testcaseEnv.testCaseBaseDir + "/dashboards"
@@ -110,7 +115,6 @@ class ReplicaBasicTest(SetupUtils):
for k,v in testcaseNonEntityDataDict.items():
if ( k == "description" ): testcaseDescription = v
- #### => update testcaseEnv
# TestcaseEnv.testcaseArgumentsDict initialized, this dictionary keeps track of the
# "testcase_args" in the testcase_properties.json such as replica_factor, num_partition, ...
self.testcaseEnv.testcaseArgumentsDict = testcaseNonEntityDataDict["testcase_args"]
@@ -124,13 +128,13 @@ class ReplicaBasicTest(SetupUtils):
# self.testcaseEnv.testCaseLogsDir
# self.testcaseEnv.testcaseArgumentsDict
+ print
# display testcase name and arguments
self.log_message("Test Case : " + testcaseDirName)
for k,v in self.testcaseEnv.testcaseArgumentsDict.items():
self.anonLogger.info(" " + k + " : " + v)
self.log_message("Description : " + testcaseDescription)
-
-
+
# ================================================================ #
# ================================================================ #
# Product Specific Testing Code Starts Here: #
@@ -153,7 +157,7 @@ class ReplicaBasicTest(SetupUtils):
# clean up data directories specified in zookeeper.properties and kafka_server_<n>.properties
kafka_system_test_utils.cleanup_data_at_remote_hosts(self.systemTestEnv, self.testcaseEnv)
-
+
# generate remote hosts log/config dirs if not exist
kafka_system_test_utils.generate_testcase_log_dirs_in_remote_hosts(self.systemTestEnv, self.testcaseEnv)
@@ -196,63 +200,46 @@ class ReplicaBasicTest(SetupUtils):
# 'topic': 'test_1',
# 'brokerid': '3'}
+ # =============================================
# validate to see if leader election is successful
+ # =============================================
self.log_message("validating leader election")
result = kafka_system_test_utils.validate_leader_election_successful( \
self.testcaseEnv, leaderDict, self.testcaseEnv.validationStatusDict)
- # checking to see if leader bouncing is required in this testcase
+ # =============================================
+ # get leader re-election latency
+ # =============================================
bounceLeaderFlag = self.testcaseEnv.testcaseArgumentsDict["bounce_leader"]
self.log_message("bounce_leader flag : " + bounceLeaderFlag)
-
if (bounceLeaderFlag.lower() == "true"):
- if self.testcaseEnv.validationStatusDict["Validate leader election successful"] == "FAILED":
- # no leader available for testing => skip this round
- self.log_message("stopping all entities")
- for entityId, parentPid in self.testcaseEnv.entityParentPidDict.items():
- kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, entityId, parentPid)
-
- continue
- else:
- # leader elected => stop leader
- try:
- leaderEntityId = leaderDict["entity_id"]
- leaderBrokerId = leaderDict["brokerid"]
- leaderPPid = self.testcaseEnv.entityParentPidDict[leaderEntityId]
- except:
- self.log_message("leader details unavailable")
-
- self.log_message("stopping leader in entity "+leaderEntityId+" with pid "+leaderPPid)
-
- kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, leaderEntityId, leaderPPid)
- self.testcaseEnv.entityParentPidDict[leaderEntityId] = ""
-
- self.logger.info("sleeping for 5s for leader re-election to complete", extra=self.d)
- time.sleep(5)
-
+ reelectionLatency = kafka_system_test_utils.get_reelection_latency(self.systemTestEnv, self.testcaseEnv, leaderDict)
+
+ # =============================================
# starting producer
- self.log_message("starting producer")
+ # =============================================
+ self.log_message("starting producer in the background")
kafka_system_test_utils.start_producer_performance(self.systemTestEnv, self.testcaseEnv)
self.anonLogger.info("sleeping for 10s")
time.sleep(10)
- kafka_system_test_utils.stop_producer()
+ # =============================================
# starting previously terminated broker
- if (bounceLeaderFlag.lower() == "true" and not self.testcaseEnv.entityParentPidDict[leaderEntityId]):
+ # =============================================
+ if bounceLeaderFlag.lower() == "true":
self.log_message("starting the previously terminated broker")
-
- stoppedLeaderEntityId = leaderDict["entity_id"]
- kafka_system_test_utils.start_entity_in_background(
- self.systemTestEnv, self.testcaseEnv, stoppedLeaderEntityId)
-
+ stoppedLeaderEntityId = leaderDict["entity_id"]
+ kafka_system_test_utils.start_entity_in_background(self.systemTestEnv, self.testcaseEnv, stoppedLeaderEntityId)
self.anonLogger.info("sleeping for 5s")
time.sleep(5)
+ # =============================================
# starting consumer
- self.log_message("starting consumer")
+ # =============================================
+ self.log_message("starting consumer in the background")
kafka_system_test_utils.start_console_consumer(self.systemTestEnv, self.testcaseEnv)
+ self.anonLogger.info("sleeping for 10s")
time.sleep(10)
- kafka_system_test_utils.stop_consumer()
# this testcase is completed - so stopping all entities
self.log_message("stopping all entities")
@@ -260,6 +247,7 @@ class ReplicaBasicTest(SetupUtils):
kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, entityId, parentPid)
# validate the data matched
+ # =============================================
self.log_message("validating data matched")
result = kafka_system_test_utils.validate_data_matched(self.systemTestEnv, self.testcaseEnv)
@@ -287,9 +275,27 @@ class ReplicaBasicTest(SetupUtils):
except Exception as e:
self.log_message("Exception while running test {0}".format(e))
traceback.print_exc()
+ traceback.print_exc()
+
+ finally:
self.log_message("stopping all entities")
+
for entityId, parentPid in self.testcaseEnv.entityParentPidDict.items():
- kafka_system_test_utils.stop_remote_entity(self.systemTestEnv, entityId, parentPid)
+ kafka_system_test_utils.force_stop_remote_entity(self.systemTestEnv, entityId, parentPid)
+
+ for entityId, jmxParentPidList in self.testcaseEnv.entityJmxParentPidDict.items():
+ for jmxParentPid in jmxParentPidList:
+ kafka_system_test_utils.force_stop_remote_entity(self.systemTestEnv, entityId, jmxParentPid)
+
+ for hostname, consumerPPid in self.testcaseEnv.consumerHostParentPidDict.items():
+ consumerEntityId = system_test_utils.get_data_by_lookup_keyval( \
+ self.systemTestEnv.clusterEntityConfigDictList, "hostname", hostname, "entity_id")
+ kafka_system_test_utils.force_stop_remote_entity(self.systemTestEnv, consumerEntityId, consumerPPid)
+
+ for hostname, producerPPid in self.testcaseEnv.producerHostParentPidDict.items():
+ producerEntityId = system_test_utils.get_data_by_lookup_keyval( \
+ self.systemTestEnv.clusterEntityConfigDictList, "hostname", hostname, "entity_id")
+ kafka_system_test_utils.force_stop_remote_entity(self.systemTestEnv, producerEntityId, producerPPid)
+ #kafka_system_test_utils.ps_grep_terminate_running_entity(self.systemTestEnv)
-
Modified: incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_1/testcase_1_properties.json
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_1/testcase_1_properties.json?rev=1379232&r1=1379231&r2=1379232&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_1/testcase_1_properties.json (original)
+++ incubator/kafka/branches/0.8/system_test/replication_testsuite/testcase_1/testcase_1_properties.json Thu Aug 30 23:47:55 2012
@@ -3,7 +3,7 @@
"testcase_args": {
"bounce_leader": "true",
"replica_factor": "3",
- "num_partition": "2"
+ "num_partition": "1"
},
"entities": [
{
@@ -47,7 +47,6 @@
"compression-codec": "1",
"message-size": "500",
"message": "500",
- "broker-list": "localhost:9091,localhost:9092,localhost:9093",
"log_filename": "producer_performance.log",
"config_filename": "producer_performance.properties"
},
Modified: incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py?rev=1379232&r1=1379231&r2=1379232&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py (original)
+++ incubator/kafka/branches/0.8/system_test/utils/kafka_system_test_utils.py Thu Aug 30 23:47:55 2012
@@ -21,6 +21,7 @@
# ===================================
import datetime
+import getpass
import inspect
import json
import logging
@@ -132,16 +133,40 @@ def collect_logs_from_remote_hosts(syste
configPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id, "config")
metricsPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id, "metrics")
- dashboardsPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id, "dashboards")
+ logPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id, "default")
+ # ==============================
+ # collect entity log file
+ # ==============================
+ cmdList = ["scp",
+ hostname + ":" + logPathName + "/*",
+ logPathName]
+ cmdStr = " ".join(cmdList)
+ logger.debug("executing command [" + cmdStr + "]", extra=d)
+ system_test_utils.sys_call(cmdStr)
+
+ # ==============================
+ # collect entity metrics file
+ # ==============================
cmdList = ["scp",
hostname + ":" + metricsPathName + "/*",
metricsPathName]
cmdStr = " ".join(cmdList)
logger.debug("executing command [" + cmdStr + "]", extra=d)
system_test_utils.sys_call(cmdStr)
-
+ # ==============================
+ # collect dashboards file
+ # ==============================
+ dashboardsPathName = get_testcase_config_log_dir_pathname(testcaseEnv, role, entity_id, "dashboards")
+ cmdList = ["scp",
+ hostname + ":" + dashboardsPathName + "/*",
+ dashboardsPathName]
+ cmdStr = " ".join(cmdList)
+ logger.debug("executing command [" + cmdStr + "]", extra=d)
+ system_test_utils.sys_call(cmdStr)
+
+
def generate_testcase_log_dirs_in_remote_hosts(systemTestEnv, testcaseEnv):
testCaseBaseDir = testcaseEnv.testCaseBaseDir
@@ -212,6 +237,7 @@ def copy_file_with_dict_values(srcFile,
outfile.write(line)
outfile.close()
+
def generate_overriden_props_files(testsuitePathname, testcaseEnv, systemTestEnv):
logger.info("calling generate_properties_files", extra=d)
@@ -259,7 +285,7 @@ def generate_overriden_props_files(tests
cfgDestPathname + "/" + tcCfg["config_filename"], tcCfg)
elif ( clusterCfg["role"] == "producer_performance"):
- tcCfg["brokerinfo"] = "zk.connect" + "=" + zkConnectStr
+ #tcCfg["brokerinfo"] = "zk.connect" + "=" + zkConnectStr
copy_file_with_dict_values(cfgTemplatePathname + "/producer_performance.properties", \
cfgDestPathname + "/" + tcCfg["config_filename"], tcCfg)
@@ -308,6 +334,62 @@ def start_brokers(systemTestEnv, testcas
start_entity_in_background(systemTestEnv, testcaseEnv, brokerEntityId)
+def get_broker_shutdown_log_line(systemTestEnv, testcaseEnv):
+
+ logger.info("looking up broker shutdown...", extra=d)
+
+ # keep track of broker related data in this dict such as broker id,
+ # entity id and timestamp and return it to the caller function
+ shutdownBrokerDict = {}
+
+ clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
+ brokerEntityIdList = system_test_utils.get_data_from_list_of_dicts( \
+ clusterEntityConfigDictList, "role", "broker", "entity_id")
+
+ for brokerEntityId in brokerEntityIdList:
+
+ hostname = system_test_utils.get_data_by_lookup_keyval( \
+ clusterEntityConfigDictList, "entity_id", brokerEntityId, "hostname")
+ logFile = system_test_utils.get_data_by_lookup_keyval( \
+ testcaseEnv.testcaseConfigsList, "entity_id", brokerEntityId, "log_filename")
+
+ shutdownBrokerDict["entity_id"] = brokerEntityId
+ shutdownBrokerDict["hostname"] = hostname
+
+ logPathName = get_testcase_config_log_dir_pathname(testcaseEnv, "broker", brokerEntityId, "default")
+ cmdStrList = ["ssh " + hostname,
+ "\"grep -i -h '" + testcaseEnv.userDefinedEnvVarDict["BROKER_SHUT_DOWN_COMPLETED_MSG"] + "' ",
+ logPathName + "/" + logFile + " | ",
+ "sort | tail -1\""]
+ cmdStr = " ".join(cmdStrList)
+
+ logger.debug("executing command [" + cmdStr + "]", extra=d)
+ subproc = system_test_utils.sys_call_return_subproc(cmdStr)
+ for line in subproc.stdout.readlines():
+
+ line = line.rstrip('\n')
+
+ if testcaseEnv.userDefinedEnvVarDict["BROKER_SHUT_DOWN_COMPLETED_MSG"] in line:
+ logger.info("found the log line : " + line, extra=d)
+ try:
+ matchObj = re.match(testcaseEnv.userDefinedEnvVarDict["REGX_BROKER_SHUT_DOWN_COMPLETED_PATTERN"], line)
+ datetimeStr = matchObj.group(1)
+ datetimeObj = datetime.strptime(datetimeStr, "%Y-%m-%d %H:%M:%S,%f")
+ unixTs = time.mktime(datetimeObj.timetuple()) + 1e-6*datetimeObj.microsecond
+ #print "{0:.3f}".format(unixTs)
+ shutdownBrokerDict["timestamp"] = unixTs
+ shutdownBrokerDict["brokerid"] = matchObj.group(2)
+ logger.info("brokerid: [" + shutdownBrokerDict["brokerid"] + "] entity_id: [" + shutdownBrokerDict["entity_id"] + "]", extra=d)
+ return shutdownBrokerDict
+ except:
+ logger.error("ERROR [unable to find matching leader details: Has the matching pattern changed?]", extra=d)
+ raise
+ #else:
+ # logger.debug("unmatched line found [" + line + "]", extra=d)
+
+ return shutdownBrokerDict
+
+
def get_leader_elected_log_line(systemTestEnv, testcaseEnv):
logger.info("looking up leader...", extra=d)
@@ -327,9 +409,6 @@ def get_leader_elected_log_line(systemTe
logFile = system_test_utils.get_data_by_lookup_keyval( \
testcaseEnv.testcaseConfigsList, "entity_id", brokerEntityId, "log_filename")
- leaderDict["entity_id"] = brokerEntityId
- leaderDict["hostname"] = hostname
-
logPathName = get_testcase_config_log_dir_pathname(testcaseEnv, "broker", brokerEntityId, "default")
cmdStrList = ["ssh " + hostname,
"\"grep -i -h '" + testcaseEnv.userDefinedEnvVarDict["LEADER_ELECTION_COMPLETED_MSG"] + "' ",
@@ -351,12 +430,21 @@ def get_leader_elected_log_line(systemTe
datetimeObj = datetime.strptime(datetimeStr, "%Y-%m-%d %H:%M:%S,%f")
unixTs = time.mktime(datetimeObj.timetuple()) + 1e-6*datetimeObj.microsecond
#print "{0:.3f}".format(unixTs)
- leaderDict["timestamp"] = unixTs
- leaderDict["brokerid"] = matchObj.group(2)
- leaderDict["topic"] = matchObj.group(3)
- leaderDict["partition"] = matchObj.group(4)
+
+ # update leaderDict when
+ # 1. leaderDict has no logline entry
+ # 2. leaderDict has existing logline entry but found another logline with more recent timestamp
+ if (len(leaderDict) > 0 and leaderDict["timestamp"] < unixTs) or (len(leaderDict) == 0):
+ leaderDict["timestamp"] = unixTs
+ leaderDict["brokerid"] = matchObj.group(2)
+ leaderDict["topic"] = matchObj.group(3)
+ leaderDict["partition"] = matchObj.group(4)
+ leaderDict["entity_id"] = brokerEntityId
+ leaderDict["hostname"] = hostname
+ logger.info("brokerid: [" + leaderDict["brokerid"] + "] entity_id: [" + leaderDict["entity_id"] + "]", extra=d)
except:
logger.error("ERROR [unable to find matching leader details: Has the matching pattern changed?]", extra=d)
+ raise
#else:
# logger.debug("unmatched line found [" + line + "]", extra=d)
@@ -394,7 +482,6 @@ def start_entity_in_background(systemTes
logPathName + "/" + logFile + " & echo pid:$! > ",
logPathName + "/entity_" + entityId + "_pid'"]
-
# construct zk.connect str and update it to testcaseEnv.userDefinedEnvVarDict.zkConnectStr
if ( len(testcaseEnv.userDefinedEnvVarDict["zkConnectStr"]) > 0 ):
testcaseEnv.userDefinedEnvVarDict["zkConnectStr"] = \
@@ -407,14 +494,10 @@ def start_entity_in_background(systemTes
"'JAVA_HOME=" + javaHome,
"JMX_PORT=" + jmxPort,
kafkaHome + "/bin/kafka-run-class.sh kafka.Kafka",
- configPathName + "/" + configFile + " &> ",
+ configPathName + "/" + configFile + " >> ",
logPathName + "/" + logFile + " & echo pid:$! > ",
logPathName + "/entity_" + entityId + "_pid'"]
- # it seems to be a better idea to launch producer & consumer in separate functions
- # elif role == "producer_performance":
- # elif role == "console_consumer":
-
cmdStr = " ".join(cmdList)
logger.debug("executing command: [" + cmdStr + "]", extra=d)
@@ -432,6 +515,7 @@ def start_entity_in_background(systemTes
logger.debug("found pid line: [" + line + "]", extra=d)
tokens = line.split(':')
testcaseEnv.entityParentPidDict[entityId] = tokens[1]
+ #print "\n#### testcaseEnv.entityParentPidDict ", testcaseEnv.entityParentPidDict, "\n"
time.sleep(1)
metrics.start_metrics_collection(hostname, jmxPort, role, entityId, systemTestEnv, testcaseEnv)
@@ -451,12 +535,14 @@ def start_console_consumer(systemTestEnv
clusterEntityConfigDictList, "entity_id", entityId, "kafka_home")
javaHome = system_test_utils.get_data_by_lookup_keyval( \
clusterEntityConfigDictList, "entity_id", entityId, "java_home")
+ jmxPort = system_test_utils.get_data_by_lookup_keyval( \
+ clusterEntityConfigDictList, "entity_id", entityId, "jmx_port")
kafkaRunClassBin = kafkaHome + "/bin/kafka-run-class.sh"
logger.info("starting console consumer", extra=d)
- consumerLogPathName = get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer", entityId, "default") + \
- "/console_consumer.log"
+ consumerLogPath = get_testcase_config_log_dir_pathname(testcaseEnv, "console_consumer", entityId, "default")
+ consumerLogPathName = consumerLogPath + "/console_consumer.log"
testcaseEnv.userDefinedEnvVarDict["consumerLogPathName"] = consumerLogPathName
@@ -465,18 +551,48 @@ def start_console_consumer(systemTestEnv
"'JAVA_HOME=" + javaHome,
"JMX_PORT=" + jmxPort,
kafkaRunClassBin + " kafka.consumer.ConsoleConsumer",
- commandArgs + " &> " + consumerLogPathName + "'"]
+ commandArgs + " &> " + consumerLogPathName,
+ " & echo pid:$! > " + consumerLogPath + "/entity_" + entityId + "_pid'"]
cmdStr = " ".join(cmdList)
+
logger.debug("executing command: [" + cmdStr + "]", extra=d)
system_test_utils.async_sys_call(cmdStr)
time.sleep(2)
metrics.start_metrics_collection(host, jmxPort, role, entityId, systemTestEnv, testcaseEnv)
+ pidCmdStr = "ssh " + host + " 'cat " + consumerLogPath + "/entity_" + entityId + "_pid'"
+ logger.debug("executing command: [" + pidCmdStr + "]", extra=d)
+ subproc = system_test_utils.sys_call_return_subproc(pidCmdStr)
+
+ # keep track of the remote entity pid in a dictionary
+ for line in subproc.stdout.readlines():
+ if line.startswith("pid"):
+ line = line.rstrip('\n')
+ logger.debug("found pid line: [" + line + "]", extra=d)
+ tokens = line.split(':')
+ testcaseEnv.consumerHostParentPidDict[host] = tokens[1]
+
def start_producer_performance(systemTestEnv, testcaseEnv):
clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
+ testcaseConfigsList = testcaseEnv.testcaseConfigsList
+ brokerListStr = ""
+
+ # construct "broker-list" for producer
+ for clusterEntityConfigDict in clusterEntityConfigDictList:
+ entityRole = clusterEntityConfigDict["role"]
+ if entityRole == "broker":
+ hostname = clusterEntityConfigDict["hostname"]
+ entityId = clusterEntityConfigDict["entity_id"]
+ port = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "port")
+
+ if len(brokerListStr) == 0:
+ brokerListStr = hostname + ":" + port
+ else:
+ brokerListStr = brokerListStr + "," + hostname + ":" + port
+
producerConfigList = system_test_utils.get_dict_from_list_of_dicts( \
clusterEntityConfigDictList, "role", "producer_performance")
@@ -489,12 +605,14 @@ def start_producer_performance(systemTes
clusterEntityConfigDictList, "entity_id", entityId, "kafka_home")
javaHome = system_test_utils.get_data_by_lookup_keyval( \
clusterEntityConfigDictList, "entity_id", entityId, "java_home")
+ jmxPort = system_test_utils.get_data_by_lookup_keyval( \
+ clusterEntityConfigDictList, "entity_id", entityId, "jmx_port")
kafkaRunClassBin = kafkaHome + "/bin/kafka-run-class.sh"
logger.info("starting producer preformance", extra=d)
- producerLogPathName = get_testcase_config_log_dir_pathname(testcaseEnv, "producer_performance", entityId, "default") + \
- "/producer_performance.log"
+ producerLogPath = get_testcase_config_log_dir_pathname(testcaseEnv, "producer_performance", entityId, "default")
+ producerLogPathName = producerLogPath + "/producer_performance.log"
testcaseEnv.userDefinedEnvVarDict["producerLogPathName"] = producerLogPathName
@@ -503,7 +621,9 @@ def start_producer_performance(systemTes
"'JAVA_HOME=" + javaHome,
"JMX_PORT=" + jmxPort,
kafkaRunClassBin + " kafka.perf.ProducerPerformance",
- commandArgs + " &> " + producerLogPathName + "'"]
+ "--broker-list " +brokerListStr,
+ commandArgs + " &> " + producerLogPathName,
+ " & echo pid:$! > " + producerLogPath + "/entity_" + entityId + "_pid'"]
cmdStr = " ".join(cmdList)
logger.debug("executing command: [" + cmdStr + "]", extra=d)
@@ -511,6 +631,18 @@ def start_producer_performance(systemTes
time.sleep(1)
metrics.start_metrics_collection(host, jmxPort, role, entityId, systemTestEnv, testcaseEnv)
+ pidCmdStr = "ssh " + host + " 'cat " + producerLogPath + "/entity_" + entityId + "_pid'"
+ logger.debug("executing command: [" + pidCmdStr + "]", extra=d)
+ subproc = system_test_utils.sys_call_return_subproc(pidCmdStr)
+
+ # keep track of the remote entity pid in a dictionary
+ for line in subproc.stdout.readlines():
+ if line.startswith("pid"):
+ line = line.rstrip('\n')
+ logger.debug("found pid line: [" + line + "]", extra=d)
+ tokens = line.split(':')
+ testcaseEnv.producerHostParentPidDict[host] = tokens[1]
+
def stop_remote_entity(systemTestEnv, entityId, parentPid):
clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
@@ -519,28 +651,32 @@ def stop_remote_entity(systemTestEnv, en
pidStack = system_test_utils.get_remote_child_processes(hostname, parentPid)
logger.info("terminating process id: " + parentPid + " in host: " + hostname, extra=d)
-# system_test_utils.sigterm_remote_process(hostname, pidStack)
+ system_test_utils.sigterm_remote_process(hostname, pidStack)
# time.sleep(1)
+# system_test_utils.sigkill_remote_process(hostname, pidStack)
+
+
+def force_stop_remote_entity(systemTestEnv, entityId, parentPid):
+ clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
+
+ hostname = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", entityId, "hostname")
+ pidStack = system_test_utils.get_remote_child_processes(hostname, parentPid)
+
+ logger.info("terminating process id: " + parentPid + " in host: " + hostname, extra=d)
system_test_utils.sigkill_remote_process(hostname, pidStack)
def create_topic(systemTestEnv, testcaseEnv):
clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
- prodPerfCfgList = system_test_utils.get_dict_from_list_of_dicts( \
- clusterEntityConfigDictList, "role", "producer_performance")
- prodPerfCfgDict = system_test_utils.get_dict_from_list_of_dicts( \
- testcaseEnv.testcaseConfigsList, "entity_id", prodPerfCfgList[0]["entity_id"])
+ prodPerfCfgList = system_test_utils.get_dict_from_list_of_dicts(clusterEntityConfigDictList, "role", "producer_performance")
+ prodPerfCfgDict = system_test_utils.get_dict_from_list_of_dicts(testcaseEnv.testcaseConfigsList, "entity_id", prodPerfCfgList[0]["entity_id"])
prodTopicList = prodPerfCfgDict[0]["topic"].split(',')
- zkEntityId = system_test_utils.get_data_by_lookup_keyval( \
- clusterEntityConfigDictList, "role", "zookeeper", "entity_id")
- zkHost = system_test_utils.get_data_by_lookup_keyval( \
- clusterEntityConfigDictList, "role", "zookeeper", "hostname")
- kafkaHome = system_test_utils.get_data_by_lookup_keyval( \
- clusterEntityConfigDictList, "entity_id", zkEntityId, "kafka_home")
- javaHome = system_test_utils.get_data_by_lookup_keyval( \
- clusterEntityConfigDictList, "entity_id", zkEntityId, "java_home")
+ zkEntityId = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "role", "zookeeper", "entity_id")
+ zkHost = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "role", "zookeeper", "hostname")
+ kafkaHome = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", zkEntityId, "kafka_home")
+ javaHome = system_test_utils.get_data_by_lookup_keyval(clusterEntityConfigDictList, "entity_id", zkEntityId, "java_home")
createTopicBin = kafkaHome + "/bin/kafka-create-topic.sh"
logger.info("zkEntityId : " + zkEntityId, extra=d)
@@ -628,6 +764,8 @@ def validate_leader_election_successful(
except Exception, e:
logger.error("leader info not completed: {0}".format(e), extra=d)
traceback.print_exc()
+ print leaderDict
+ traceback.print_exc()
validationStatusDict["Validate leader election successful"] = "FAILED"
return False
else:
@@ -645,7 +783,8 @@ def cleanup_data_at_remote_hosts(systemT
hostname = clusterEntityConfigDict["hostname"]
entityId = clusterEntityConfigDict["entity_id"]
role = clusterEntityConfigDict["role"]
- #testcasePathName = testcaseEnv.testcaseBaseDir
+ kafkaHome = clusterEntityConfigDict["kafka_home"]
+ testCaseBaseDir = testcaseEnv.testCaseBaseDir
cmdStr = ""
dataDir = ""
@@ -667,17 +806,102 @@ def cleanup_data_at_remote_hosts(systemT
logger.warn("aborting test...", extra=d)
sys.exit(1)
+ # ============================
+ # cleaning data dir
+ # ============================
logger.debug("executing command [" + cmdStr + "]", extra=d)
system_test_utils.sys_call(cmdStr)
+ # ============================
+ # cleaning log/metrics/svg, ...
+ # ============================
+ if system_test_utils.remote_host_file_exists(hostname, kafkaHome + "/bin/kafka-run-class.sh"):
+ # so kafkaHome is a real kafka installation
+ cmdStr = "ssh " + hostname + " \"find " + testCaseBaseDir + " -name '*.log' | xargs rm 2> /dev/null\""
+ logger.debug("executing command [" + cmdStr + "]", extra=d)
+ system_test_utils.sys_call(cmdStr)
+
+ cmdStr = "ssh " + hostname + " \"find " + testCaseBaseDir + " -name '*_pid' | xargs rm 2> /dev/null\""
+ logger.debug("executing command [" + cmdStr + "]", extra=d)
+ system_test_utils.sys_call(cmdStr)
+
+ cmdStr = "ssh " + hostname + " \"find " + testCaseBaseDir + " -name '*.csv' | xargs rm 2> /dev/null\""
+ logger.debug("executing command [" + cmdStr + "]", extra=d)
+ system_test_utils.sys_call(cmdStr)
+
+ cmdStr = "ssh " + hostname + " \"find " + testCaseBaseDir + " -name '*.svg' | xargs rm 2> /dev/null\""
+ logger.debug("executing command [" + cmdStr + "]", extra=d)
+ system_test_utils.sys_call(cmdStr)
+
+ cmdStr = "ssh " + hostname + " \"find " + testCaseBaseDir + " -name '*.html' | xargs rm 2> /dev/null\""
+ logger.debug("executing command [" + cmdStr + "]", extra=d)
+ system_test_utils.sys_call(cmdStr)
+
def get_entity_log_directory(testCaseBaseDir, entity_id, role):
return testCaseBaseDir + "/logs/" + role + "-" + entity_id
def get_entities_for_role(clusterConfig, role):
return filter(lambda entity: entity['role'] == role, clusterConfig)
-
+
def stop_consumer():
system_test_utils.sys_call("ps -ef | grep ConsoleConsumer | grep -v grep | tr -s ' ' | cut -f2 -d' ' | xargs kill -15")
-def stop_producer():
- system_test_utils.sys_call("ps -ef | grep ProducerPerformance | grep -v grep | tr -s ' ' | cut -f2 -d' ' | xargs kill -15")
+def ps_grep_terminate_running_entity(systemTestEnv):
+ clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
+ username = getpass.getuser()
+
+ for clusterEntityConfigDict in systemTestEnv.clusterEntityConfigDictList:
+ hostname = clusterEntityConfigDict["hostname"]
+ cmdList = ["ssh " + hostname,
+ "\"ps auxw | grep -v grep | grep -v Bootstrap | grep -v vim | grep ^" + username,
+ "| grep -i 'java\|server\-start\|run\-\|producer\|consumer\|jmxtool' | grep kafka",
+ "| tr -s ' ' | cut -f2 -d ' ' | xargs kill -9" + "\""]
+
+ cmdStr = " ".join(cmdList)
+ logger.debug("executing command [" + cmdStr + "]", extra=d)
+
+ system_test_utils.sys_call(cmdStr)
+
+
+def get_reelection_latency(systemTestEnv, testcaseEnv, leaderDict):
+ leaderEntityId = None
+ leaderBrokerId = None
+ leaderPPid = None
+ shutdownLeaderTimestamp = None
+
+ if testcaseEnv.validationStatusDict["Validate leader election successful"] == "FAILED":
+ # leader election is not successful - something is wrong => so skip this testcase
+ #continue
+ return None
+ else:
+ # leader elected => stop leader
+ try:
+ leaderEntityId = leaderDict["entity_id"]
+ leaderBrokerId = leaderDict["brokerid"]
+ leaderPPid = testcaseEnv.entityParentPidDict[leaderEntityId]
+ except:
+ logger.info("leader details unavailable", extra=d)
+ raise
+
+ logger.info("stopping leader in entity "+leaderEntityId+" with pid "+leaderPPid, extra=d)
+ stop_remote_entity(systemTestEnv, leaderEntityId, leaderPPid)
+
+ logger.info("sleeping for 5s for leader re-election to complete", extra=d)
+ time.sleep(5)
+
+ # get broker shut down completed timestamp
+ shutdownBrokerDict = get_broker_shutdown_log_line(systemTestEnv, testcaseEnv)
+ #print shutdownBrokerDict
+ logger.info("unix timestamp of shut down completed: " + str("{0:.6f}".format(shutdownBrokerDict["timestamp"])), extra=d)
+
+ logger.info("looking up new leader", extra=d)
+ leaderDict2 = get_leader_elected_log_line(systemTestEnv, testcaseEnv)
+ #print leaderDict2
+ logger.info("unix timestamp of new elected leader: " + str("{0:.6f}".format(leaderDict2["timestamp"])), extra=d)
+ leaderReElectionLatency = float(leaderDict2["timestamp"]) - float(shutdownBrokerDict["timestamp"])
+ logger.info("leader Re-election Latency: " + str(leaderReElectionLatency) + " sec", extra=d)
+ testcaseEnv.validationStatusDict["Leader Election Latency"] = str("{0:.2f}".format(leaderReElectionLatency * 1000)) + " ms"
+
+ return leaderReElectionLatency
+
+
Modified: incubator/kafka/branches/0.8/system_test/utils/metrics.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/utils/metrics.py?rev=1379232&r1=1379231&r2=1379232&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/utils/metrics.py (original)
+++ incubator/kafka/branches/0.8/system_test/utils/metrics.py Thu Aug 30 23:47:55 2012
@@ -31,6 +31,8 @@ import traceback
import csv
import time
+import matplotlib as mpl
+mpl.use('Agg')
import matplotlib.pyplot as plt
from collections import namedtuple
import numpy
@@ -78,6 +80,8 @@ def ensure_valid_headers(headers, attrib
attributeColumnIndex = headers.index(attributes)
return attributeColumnIndex
except ValueError as ve:
+ #print "#### attributes : ", attributes
+ #print "#### headers : ", headers
raise Exception("There should be exactly one column that matches attribute: {0} in".format(attributes) +
" headers: {0}".format(",".join(headers)))
@@ -119,6 +123,7 @@ def plot_graphs(inputCsvFiles, labels, t
plots.append(p1)
except Exception as e:
logger.error("ERROR while plotting data for {0}: {1}".format(inputCsvFile, e), extra=d)
+ traceback.print_exc()
# find xmin, xmax, ymin, ymax from all csv files
xmin = min(map(lambda coord: coord.x, coordinates))
xmax = max(map(lambda coord: coord.x, coordinates))
@@ -172,6 +177,7 @@ def draw_graph_for_role(graphs, entities
# print "Finished plotting graph for metric {0} on entity {1}".format(graph['graph_name'], entity['entity_id'])
except Exception as e:
logger.error("ERROR while plotting graph {0}: {1}".format(outputGraphFile, e), extra=d)
+ traceback.print_exc()
def build_all_dashboards(metricsDefinitionFile, testcaseDashboardsDir, clusterConfig):
metricsHtmlFile = testcaseDashboardsDir + "/metrics.html"
@@ -231,19 +237,29 @@ def start_metrics_collection(jmxHost, jm
startMetricsCommand = " ".join(startMetricsCmdList)
logger.debug("executing command: [" + startMetricsCommand + "]", extra=d)
system_test_utils.async_sys_call(startMetricsCommand)
-
- pidCmdStr = "ssh " + jmxHost + " 'cat " + entityMetricsDir + "/entity_pid'"
+ time.sleep(1)
+
+ pidCmdStr = "ssh " + jmxHost + " 'cat " + entityMetricsDir + "/entity_pid 2> /dev/null'"
logger.debug("executing command: [" + pidCmdStr + "]", extra=d)
subproc = system_test_utils.sys_call_return_subproc(pidCmdStr)
- # keep track of the remote entity pid in a dictionary
+ # keep track of JMX ppid in a dictionary of entity_id to list of JMX ppid
+ # testcaseEnv.entityJmxParentPidDict:
+ # key: entity_id
+ # val: list of JMX ppid associated to that entity_id
+ # { 1: [1234, 1235, 1236], 2: [2234, 2235, 2236], ... }
for line in subproc.stdout.readlines():
+ line = line.rstrip('\n')
+ logger.debug("line: [" + line + "]", extra=d)
if line.startswith("pid"):
- line = line.rstrip('\n')
logger.debug("found pid line: [" + line + "]", extra=d)
tokens = line.split(':')
thisPid = tokens[1]
- testcaseEnv.entityParentPidDict[thisPid] = thisPid
+ if entityId not in testcaseEnv.entityJmxParentPidDict:
+ testcaseEnv.entityJmxParentPidDict[entityId] = []
+ testcaseEnv.entityJmxParentPidDict[entityId].append(thisPid)
+ #print "\n#### testcaseEnv.entityJmxParentPidDict ", testcaseEnv.entityJmxParentPidDict, "\n"
+
def stop_metrics_collection(jmxHost, jmxPort):
logger.info("stopping metrics collection on " + jmxHost + ":" + jmxPort, extra=d)
Modified: incubator/kafka/branches/0.8/system_test/utils/system_test_utils.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/utils/system_test_utils.py?rev=1379232&r1=1379231&r2=1379232&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/utils/system_test_utils.py (original)
+++ incubator/kafka/branches/0.8/system_test/utils/system_test_utils.py Thu Aug 30 23:47:55 2012
@@ -24,7 +24,9 @@ import inspect
import json
import logging
import os
+import re
import signal
+import socket
import subprocess
import sys
import time
@@ -34,6 +36,15 @@ thisClassName = '(system_test_utils)'
d = {'name_of_class': thisClassName}
+def get_current_unix_timestamp():
+ ts = time.time()
+ return "{0:.6f}".format(ts)
+
+
+def get_local_hostname():
+ return socket.gethostname()
+
+
def sys_call(cmdStr):
output = ""
#logger.info("executing command [" + cmdStr + "]", extra=d)
@@ -218,6 +229,7 @@ def sigterm_remote_process(hostname, pid
sys_call_return_subproc(cmdStr)
except:
print "WARN - pid:",pid,"not found"
+ raise
def sigkill_remote_process(hostname, pidStack):
@@ -231,6 +243,7 @@ def sigkill_remote_process(hostname, pid
sys_call_return_subproc(cmdStr)
except:
print "WARN - pid:",pid,"not found"
+ raise
def terminate_process(pidStack):
@@ -240,6 +253,7 @@ def terminate_process(pidStack):
os.kill(int(pid), signal.SIGTERM)
except:
print "WARN - pid:",pid,"not found"
+ raise
def convert_keyval_to_cmd_args(configFilePathname):
@@ -267,6 +281,17 @@ def sys_call_return_subproc(cmd_str):
return p
+def remote_host_file_exists(hostname, pathname):
+ cmdStr = "ssh " + hostname + " 'ls " + pathname + "'"
+ logger.debug("executing command: [" + cmdStr + "]", extra=d)
+ subproc = sys_call_return_subproc(cmdStr)
+
+ for line in subproc.stdout.readlines():
+ if "No such file or directory" in line:
+ return False
+ return True
+
+
def remote_host_directory_exists(hostname, path):
cmdStr = "ssh " + hostname + " 'ls -d " + path + "'"
logger.debug("executing command: [" + cmdStr + "]", extra=d)
@@ -296,24 +321,39 @@ def remote_host_processes_stopped(hostna
def setup_remote_hosts(systemTestEnv):
clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
+ localKafkaHome = os.path.abspath(systemTestEnv.SYSTEM_TEST_BASE_DIR + "/..")
+ localJavaBin = ""
+ localJavaHome = ""
+
+ subproc = sys_call_return_subproc("which java")
+ for line in subproc.stdout.readlines():
+ if line.startswith("which: no "):
+ logger.error("No Java binary found in local host", extra=d)
+ return False
+ else:
+ line = line.rstrip('\n')
+ localJavaBin = line
+ matchObj = re.match("(.*)\/bin\/java$", line)
+ localJavaHome = matchObj.group(1)
+
+ listIndex = -1
for clusterEntityConfigDict in clusterEntityConfigDictList:
+ listIndex += 1
+
hostname = clusterEntityConfigDict["hostname"]
kafkaHome = clusterEntityConfigDict["kafka_home"]
javaHome = clusterEntityConfigDict["java_home"]
- localKafkaHome = os.path.abspath(systemTestEnv.SYSTEM_TEST_BASE_DIR + "/..")
- logger.info("local kafka home : [" + localKafkaHome + "]", extra=d)
- if kafkaHome != localKafkaHome:
- logger.error("kafkaHome [" + kafkaHome + "] must be the same as [" + localKafkaHome + "] in host [" + hostname + "]", extra=d)
- logger.error("please update cluster_config.json and run again. Aborting test ...", extra=d)
- sys.exit(1)
-
- #logger.info("checking running processes in host [" + hostname + "]", extra=d)
- #if not remote_host_processes_stopped(hostname):
- # logger.error("Running processes found in host [" + hostname + "]", extra=d)
- # return False
+ if hostname == "localhost" and javaHome == "default":
+ clusterEntityConfigDictList[listIndex]["java_home"] = localJavaHome
+
+ if hostname == "localhost" and kafkaHome == "default":
+ clusterEntityConfigDictList[listIndex]["kafka_home"] = localKafkaHome
+
+ kafkaHome = clusterEntityConfigDict["kafka_home"]
+ javaHome = clusterEntityConfigDict["java_home"]
- logger.info("checking JAVA_HOME [" + javaHome + "] in host [" + hostname + "]", extra=d)
+ logger.info("checking java binary [" + localJavaBin + "] in host [" + hostname + "]", extra=d)
if not remote_host_directory_exists(hostname, javaHome):
logger.error("Directory not found: [" + javaHome + "] in host [" + hostname + "]", extra=d)
return False
@@ -339,3 +379,22 @@ def copy_source_to_remote_hosts(hostname
for line in subproc.stdout.readlines():
dummyVar = 1
+
+def remove_kafka_home_dir_at_remote_hosts(hostname, kafkaHome):
+
+ if remote_host_file_exists(hostname, kafkaHome + "/bin/kafka-run-class.sh"):
+ cmdStr = "ssh " + hostname + " 'chmod -R 777 " + kafkaHome + "'"
+ logger.info("executing command [" + cmdStr + "]", extra=d)
+ system_test_utils.sys_call(cmdStr)
+
+ cmdStr = "ssh " + hostname + " 'rm -r " + kafkaHome + "'"
+ logger.info("executing command [" + cmdStr + "]", extra=d)
+ #system_test_utils.sys_call(cmdStr)
+ else:
+ logger.warn("possible destructive command [" + cmdStr + "]", extra=d)
+ logger.warn("check config file: system_test/cluster_config.properties", extra=d)
+ logger.warn("aborting test...", extra=d)
+ sys.exit(1)
+
+
+
Modified: incubator/kafka/branches/0.8/system_test/utils/testcase_env.py
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/utils/testcase_env.py?rev=1379232&r1=1379231&r2=1379232&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/utils/testcase_env.py (original)
+++ incubator/kafka/branches/0.8/system_test/utils/testcase_env.py Thu Aug 30 23:47:55 2012
@@ -30,9 +30,30 @@ class TestcaseEnv():
# Generic testcase environment
# ================================
- # dictionary of entity parent pid
+ # dictionary of entity_id to ppid for entities such as zookeepers & brokers
+ # key: entity_id
+ # val: ppid of zk or broker associated to that entity_id
+ # { 0: 12345, 1: 12389, ... }
entityParentPidDict = {}
+ # dictionary of entity_id to list of JMX ppid
+ # key: entity_id
+ # val: list of JMX ppid associated to that entity_id
+ # { 1: [1234, 1235, 1236], 2: [2234, 2235, 2236], ... }
+ entityJmxParentPidDict = {}
+
+ # dictionary of hostname-topic-ppid for consumer
+ # key: hostname
+ # val: dict of topic-ppid
+ # { host1: { test1 : 12345 }, host1: { test2 : 12389 }, ... }
+ consumerHostParentPidDict = {}
+
+ # dictionary of hostname-topic-ppid for producer
+ # key: hostname
+ # val: dict of topic-ppid
+ # { host1: { test1 : 12345 }, host1: { test2 : 12389 }, ... }
+ producerHostParentPidDict = {}
+
# list of testcase configs
testcaseConfigsList = []