You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/02/27 23:15:46 UTC

git commit: kafka-1212; System test exception handling does not stop background producer threads; patched by Guozhang Wang; reviewed by Neha Narkhede, Joel Koshy, and Jun Rao

Repository: kafka
Updated Branches:
  refs/heads/trunk 5e2a9a560 -> eb6da5749


kafka-1212; System test exception handling does not stop background producer threads; patched by Guozhang Wang; reviewed by Neha Narkhede, Joel Koshy, and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/eb6da574
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/eb6da574
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/eb6da574

Branch: refs/heads/trunk
Commit: eb6da57492caad7a6b71692ad184a95c89035b67
Parents: 5e2a9a5
Author: Guozhang Wang <gu...@linkedin.com>
Authored: Thu Feb 27 14:15:41 2014 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Feb 27 14:15:41 2014 -0800

----------------------------------------------------------------------
 system_test/utils/kafka_system_test_utils.py | 33 ++++++++++++++++++++---
 1 file changed, 30 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/eb6da574/system_test/utils/kafka_system_test_utils.py
----------------------------------------------------------------------
diff --git a/system_test/utils/kafka_system_test_utils.py b/system_test/utils/kafka_system_test_utils.py
index fb4a9c0..5d2b7df 100644
--- a/system_test/utils/kafka_system_test_utils.py
+++ b/system_test/utils/kafka_system_test_utils.py
@@ -933,6 +933,9 @@ def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafk
     jmxPort           = system_test_utils.get_data_by_lookup_keyval(entityConfigList, "entity_id", entityId, "jmx_port")
     kafkaRunClassBin  = kafkaHome + "/bin/kafka-run-class.sh"
 
+    # first keep track of its pid
+    testcaseEnv.producerHostParentPidDict[entityId] = os.getpid()
+
     # get optional testcase arguments
     numTopicsForAutoGenString = -1
     try:
@@ -1090,6 +1093,9 @@ def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafk
             testcaseEnv.lock.release()
         time.sleep(1)
 
+    # finally remove itself from the tracking pids
+    del testcaseEnv.producerHostParentPidDict[entityId]
+
 def stop_remote_entity(systemTestEnv, entityId, parentPid, signalType="SIGTERM"):
     clusterEntityConfigDictList = systemTestEnv.clusterEntityConfigDictList
 
@@ -1481,9 +1487,30 @@ def stop_all_remote_running_processes(systemTestEnv, testcaseEnv):
 
     entityConfigs = systemTestEnv.clusterEntityConfigDictList
 
-    for hostname, producerPPid in testcaseEnv.producerHostParentPidDict.items():
-        producerEntityId = system_test_utils.get_data_by_lookup_keyval(entityConfigs, "hostname", hostname, "entity_id")
-        stop_remote_entity(systemTestEnv, producerEntityId, producerPPid)
+    # If there are any alive local threads that keep starting remote producer performance, we need to kill them;
+    # note we do not need to stop remote processes since they will terminate themselves eventually.
+    if len(testcaseEnv.producerHostParentPidDict) != 0:
+        # =============================================
+        # tell producer to stop
+        # =============================================
+        testcaseEnv.lock.acquire()
+        testcaseEnv.userDefinedEnvVarDict["stopBackgroundProducer"] = True
+        testcaseEnv.lock.release()
+
+        # =============================================
+        # wait for producer thread's update of
+        # "backgroundProducerStopped" to be "True"
+        # =============================================
+        while 1:
+            testcaseEnv.lock.acquire()
+            logger.info("status of backgroundProducerStopped : [" + \
+                str(testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"]) + "]", extra=d)
+            if testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"]:
+                logger.info("all producer threads completed", extra=d)
+                break
+            testcaseEnv.lock.release()
+    
+        testcaseEnv.producerHostParentPidDict.clear()
 
     for hostname, consumerPPid in testcaseEnv.consumerHostParentPidDict.items():
         consumerEntityId = system_test_utils.get_data_by_lookup_keyval(entityConfigs, "hostname", hostname, "entity_id")