You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/02/27 23:15:46 UTC
git commit: kafka-1212;
System test exception handling does not stop background producer
threads; patched by Guozhang Wang;
reviewed by Neha Narkhede, Joel Koshy, and Jun Rao
Repository: kafka
Updated Branches:
refs/heads/trunk 5e2a9a560 -> eb6da5749
kafka-1212; System test exception handling does not stop background producer threads; patched by Guozhang Wang; reviewed by Neha Narkhede, Joel Koshy, and Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/eb6da574
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/eb6da574
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/eb6da574
Branch: refs/heads/trunk
Commit: eb6da57492caad7a6b71692ad184a95c89035b67
Parents: 5e2a9a5
Author: Guozhang Wang <gu...@linkedin.com>
Authored: Thu Feb 27 14:15:41 2014 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Feb 27 14:15:41 2014 -0800
----------------------------------------------------------------------
system_test/utils/kafka_system_test_utils.py | 33 ++++++++++++++++++++---
1 file changed, 30 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/eb6da574/system_test/utils/kafka_system_test_utils.py
----------------------------------------------------------------------
diff --git a/system_test/utils/kafka_system_test_utils.py b/system_test/utils/kafka_system_test_utils.py
index fb4a9c0..5d2b7df 100644
--- a/system_test/utils/kafka_system_test_utils.py
+++ b/system_test/utils/kafka_system_test_utils.py
@@ -933,6 +933,9 @@ def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafk
jmxPort = system_test_utils.get_data_by_lookup_keyval(entityConfigList, "entity_id", entityId, "jmx_port")
kafkaRunClassBin = kafkaHome + "/bin/kafka-run-class.sh"
+ # first keep track of its pid
+ testcaseEnv.producerHostParentPidDict[entityId] = os.getpid()
+
# get optional testcase arguments
numTopicsForAutoGenString = -1
try:
@@ -1090,6 +1093,9 @@ def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafk
testcaseEnv.lock.release()
time.sleep(1)
+ # finally remove itself from the tracking pids
+ del testcaseEnv.producerHostParentPidDict[entityId]
+
def stop_remote_entity(systemTestEnv, entityId, parentPid, signalType="SIGTERM"):
clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
@@ -1481,9 +1487,30 @@ def stop_all_remote_running_processes(systemTestEnv, testcaseEnv):
entityConfigs = systemTestEnv.clusterEntityConfigDictList
- for hostname, producerPPid in testcaseEnv.producerHostParentPidDict.items():
- producerEntityId = system_test_utils.get_data_by_lookup_keyval(entityConfigs, "hostname", hostname, "entity_id")
- stop_remote_entity(systemTestEnv, producerEntityId, producerPPid)
+ # If there are any alive local threads that keep starting remote producer performance, we need to kill them;
+ # note we do not need to stop remote processes since they will terminate themselves eventually.
+ if len(testcaseEnv.producerHostParentPidDict) != 0:
+ # =============================================
+ # tell producer to stop
+ # =============================================
+ testcaseEnv.lock.acquire()
+ testcaseEnv.userDefinedEnvVarDict["stopBackgroundProducer"] = True
+ testcaseEnv.lock.release()
+
+ # =============================================
+ # wait for producer thread's update of
+ # "backgroundProducerStopped" to be "True"
+ # =============================================
+ while 1:
+ testcaseEnv.lock.acquire()
+ logger.info("status of backgroundProducerStopped : [" + \
+ str(testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"]) + "]", extra=d)
+ if testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"]:
+ logger.info("all producer threads completed", extra=d)
+ break
+ testcaseEnv.lock.release()
+
+ testcaseEnv.producerHostParentPidDict.clear()
for hostname, consumerPPid in testcaseEnv.consumerHostParentPidDict.items():
consumerEntityId = system_test_utils.get_data_by_lookup_keyval(entityConfigs, "hostname", hostname, "entity_id")