You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2014/08/15 19:47:28 UTC

git commit: KAFKA-1582; System Test should wait for producer to finish; reviewed by Joel Koshy and Guozhang Wang

Repository: kafka
Updated Branches:
  refs/heads/trunk caf256ad8 -> d678449b9


KAFKA-1582; System Test should wait for producer to finish; reviewed by Joel Koshy and Guozhang Wang


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d678449b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d678449b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d678449b

Branch: refs/heads/trunk
Commit: d678449b967ed92a5a02289c061f201da25b07de
Parents: caf256a
Author: Dong Lin <li...@gmail.com>
Authored: Fri Aug 15 10:46:34 2014 -0700
Committer: Joel Koshy <jj...@gmail.com>
Committed: Fri Aug 15 10:46:34 2014 -0700

----------------------------------------------------------------------
 system_test/utils/kafka_system_test_utils.py | 34 ++++++++++++-----------
 1 file changed, 18 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d678449b/system_test/utils/kafka_system_test_utils.py
----------------------------------------------------------------------
diff --git a/system_test/utils/kafka_system_test_utils.py b/system_test/utils/kafka_system_test_utils.py
index 6edd64a..fcacf0a 100644
--- a/system_test/utils/kafka_system_test_utils.py
+++ b/system_test/utils/kafka_system_test_utils.py
@@ -792,19 +792,19 @@ def start_entity_in_background(systemTestEnv, testcaseEnv, entityId):
         except:
             pass
 
-	# 4. consumer config
-	consumerProperties = {}
-	consumerProperties["consumer.timeout.ms"] = timeoutMs
-	try:
+        # 4. consumer config
+        consumerProperties = {}
+        consumerProperties["consumer.timeout.ms"] = timeoutMs
+        try:
             groupOption = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "group.id")
             consumerProperties["group.id"] = groupOption
         except:
             pass
 
-	props_file_path=write_consumer_properties(consumerProperties)
-	scpCmdStr = "scp "+ props_file_path +" "+ hostname + ":/tmp/"
-	logger.debug("executing command [" + scpCmdStr + "]", extra=d)
-	system_test_utils.sys_call(scpCmdStr)
+        props_file_path=write_consumer_properties(consumerProperties)
+        scpCmdStr = "scp "+ props_file_path +" "+ hostname + ":/tmp/"
+        logger.debug("executing command [" + scpCmdStr + "]", extra=d)
+        system_test_utils.sys_call(scpCmdStr)
 
         if len(formatterOption) > 0:
             formatterOption = " --formatter " + formatterOption + " "
@@ -930,12 +930,12 @@ def start_console_consumer(systemTestEnv, testcaseEnv):
             logger.error("Invalid cluster name : " + clusterName, extra=d)
             sys.exit(1)
 
-	consumerProperties = {}
-	consumerProperties["consumer.timeout.ms"] = timeoutMs
-	props_file_path=write_consumer_properties(consumerProperties)
-	scpCmdStr = "scp "+ props_file_path +" "+ host + ":/tmp/"
-	logger.debug("executing command [" + scpCmdStr + "]", extra=d)
-	system_test_utils.sys_call(scpCmdStr)
+        consumerProperties = {}
+        consumerProperties["consumer.timeout.ms"] = timeoutMs
+        props_file_path=write_consumer_properties(consumerProperties)
+        scpCmdStr = "scp "+ props_file_path +" "+ host + ":/tmp/"
+        logger.debug("executing command [" + scpCmdStr + "]", extra=d)
+        system_test_utils.sys_call(scpCmdStr)
 
         cmdList = ["ssh " + host,
                    "'JAVA_HOME=" + javaHome,
@@ -1136,7 +1136,8 @@ def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafk
                        "--metrics-dir " + metricsDir,
                        boolArgumentsStr,
                        " >> " + producerLogPathName,
-                       " & echo $! > " + producerLogPath + "/entity_" + entityId + "_pid'"]
+                       " & echo $! > " + producerLogPath + "/entity_" + entityId + "_pid",
+                       " & wait'"]
 
             if kafka07Client:
                 cmdList[:] = []
@@ -1167,7 +1168,8 @@ def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafk
                        "--message-size " + messageSize,
                        "--vary-message-size --async",
                        " >> " + producerLogPathName,
-                       " & echo $! > " + producerLogPath + "/entity_" + entityId + "_pid'"]
+                       " & echo $! > " + producerLogPath + "/entity_" + entityId + "_pid",
+                       " & wait'"]
 
             cmdStr = " ".join(cmdList)
             logger.debug("executing command: [" + cmdStr + "]", extra=d)