You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2014/08/15 19:47:28 UTC
git commit: KAFKA-1582; System Test should wait for producer to finish;
reviewed by Joel Koshy and Guozhang Wang
Repository: kafka
Updated Branches:
refs/heads/trunk caf256ad8 -> d678449b9
KAFKA-1582; System Test should wait for producer to finish; reviewed by Joel Koshy and Guozhang Wang
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d678449b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d678449b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d678449b
Branch: refs/heads/trunk
Commit: d678449b967ed92a5a02289c061f201da25b07de
Parents: caf256a
Author: Dong Lin <li...@gmail.com>
Authored: Fri Aug 15 10:46:34 2014 -0700
Committer: Joel Koshy <jj...@gmail.com>
Committed: Fri Aug 15 10:46:34 2014 -0700
----------------------------------------------------------------------
system_test/utils/kafka_system_test_utils.py | 34 ++++++++++++-----------
1 file changed, 18 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/d678449b/system_test/utils/kafka_system_test_utils.py
----------------------------------------------------------------------
diff --git a/system_test/utils/kafka_system_test_utils.py b/system_test/utils/kafka_system_test_utils.py
index 6edd64a..fcacf0a 100644
--- a/system_test/utils/kafka_system_test_utils.py
+++ b/system_test/utils/kafka_system_test_utils.py
@@ -792,19 +792,19 @@ def start_entity_in_background(systemTestEnv, testcaseEnv, entityId):
except:
pass
- # 4. consumer config
- consumerProperties = {}
- consumerProperties["consumer.timeout.ms"] = timeoutMs
- try:
+ # 4. consumer config
+ consumerProperties = {}
+ consumerProperties["consumer.timeout.ms"] = timeoutMs
+ try:
groupOption = system_test_utils.get_data_by_lookup_keyval(testcaseConfigsList, "entity_id", entityId, "group.id")
consumerProperties["group.id"] = groupOption
except:
pass
- props_file_path=write_consumer_properties(consumerProperties)
- scpCmdStr = "scp "+ props_file_path +" "+ hostname + ":/tmp/"
- logger.debug("executing command [" + scpCmdStr + "]", extra=d)
- system_test_utils.sys_call(scpCmdStr)
+ props_file_path=write_consumer_properties(consumerProperties)
+ scpCmdStr = "scp "+ props_file_path +" "+ hostname + ":/tmp/"
+ logger.debug("executing command [" + scpCmdStr + "]", extra=d)
+ system_test_utils.sys_call(scpCmdStr)
if len(formatterOption) > 0:
formatterOption = " --formatter " + formatterOption + " "
@@ -930,12 +930,12 @@ def start_console_consumer(systemTestEnv, testcaseEnv):
logger.error("Invalid cluster name : " + clusterName, extra=d)
sys.exit(1)
- consumerProperties = {}
- consumerProperties["consumer.timeout.ms"] = timeoutMs
- props_file_path=write_consumer_properties(consumerProperties)
- scpCmdStr = "scp "+ props_file_path +" "+ host + ":/tmp/"
- logger.debug("executing command [" + scpCmdStr + "]", extra=d)
- system_test_utils.sys_call(scpCmdStr)
+ consumerProperties = {}
+ consumerProperties["consumer.timeout.ms"] = timeoutMs
+ props_file_path=write_consumer_properties(consumerProperties)
+ scpCmdStr = "scp "+ props_file_path +" "+ host + ":/tmp/"
+ logger.debug("executing command [" + scpCmdStr + "]", extra=d)
+ system_test_utils.sys_call(scpCmdStr)
cmdList = ["ssh " + host,
"'JAVA_HOME=" + javaHome,
@@ -1136,7 +1136,8 @@ def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafk
"--metrics-dir " + metricsDir,
boolArgumentsStr,
" >> " + producerLogPathName,
- " & echo $! > " + producerLogPath + "/entity_" + entityId + "_pid'"]
+ " & echo $! > " + producerLogPath + "/entity_" + entityId + "_pid",
+ " & wait'"]
if kafka07Client:
cmdList[:] = []
@@ -1167,7 +1168,8 @@ def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafk
"--message-size " + messageSize,
"--vary-message-size --async",
" >> " + producerLogPathName,
- " & echo $! > " + producerLogPath + "/entity_" + entityId + "_pid'"]
+ " & echo $! > " + producerLogPath + "/entity_" + entityId + "_pid",
+ " & wait'"]
cmdStr = " ".join(cmdList)
logger.debug("executing command: [" + cmdStr + "]", extra=d)