You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/04/01 20:40:13 UTC
git commit: kafka-1318;
waiting for producer to stop is not reliable in system tests;
patched by Jun Rao; reviewed by Guozhang Wang, Timothy Chen and Neha Narkhede
Repository: kafka
Updated Branches:
refs/heads/trunk 083b6265c -> bd784aeb2
kafka-1318; waiting for producer to stop is not reliable in system tests; patched by Jun Rao; reviewed by Guozhang Wang, Timothy Chen and Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bd784aeb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bd784aeb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bd784aeb
Branch: refs/heads/trunk
Commit: bd784aeb20b6a4252675c681116b47ea86402130
Parents: 083b626
Author: Jun Rao <ju...@gmail.com>
Authored: Tue Apr 1 11:40:03 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Apr 1 11:40:03 2014 -0700
----------------------------------------------------------------------
system_test/README.txt | 5 +-
.../0.7/config/log4j.properties | 78 ++++++++++++++++++++
.../config/migration_producer.properties | 2 +
.../migration_tool_test.py | 5 --
system_test/utils/kafka_system_test_utils.py | 23 ++++--
5 files changed, 100 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/bd784aeb/system_test/README.txt
----------------------------------------------------------------------
diff --git a/system_test/README.txt b/system_test/README.txt
index 87937ec..f9972d1 100644
--- a/system_test/README.txt
+++ b/system_test/README.txt
@@ -54,9 +54,8 @@ The framework has the following levels:
1. Update system_test/cluster_config.json for "kafka_home" & "java_home" specific to your environment
2. Edit system_test/replication_testsuite/testcase_1/testcase_1_properties.json and update "broker-list" to the proper settings of your environment. (If this test is to be run in a single localhost, no change is required for this.)
3. To run the test, go to <kafka_home>/system_test and run the following command:
- $ python -B system_test_runner.py
- 4. To turn on debugging, update system_test/system_test_runner.py and uncomment the following line:
- namedLogger.setLevel(logging.DEBUG)
+ $ python -u -B system_test_runner.py 2>&1 | tee system_test_output.log
+ 4. To turn on debugging, update system_test/logging.conf by changing the level in handlers session from INFO to DEBUG.
# ==========================
# Adding Test Case
http://git-wip-us.apache.org/repos/asf/kafka/blob/bd784aeb/system_test/migration_tool_testsuite/0.7/config/log4j.properties
----------------------------------------------------------------------
diff --git a/system_test/migration_tool_testsuite/0.7/config/log4j.properties b/system_test/migration_tool_testsuite/0.7/config/log4j.properties
new file mode 100644
index 0000000..baa698b
--- /dev/null
+++ b/system_test/migration_tool_testsuite/0.7/config/log4j.properties
@@ -0,0 +1,78 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+kafka.logs.dir=logs
+
+log4j.rootLogger=INFO, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log
+log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.appender.stateChangeAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.stateChangeAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.stateChangeAppender.File=${kafka.logs.dir}/state-change.log
+log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.appender.requestAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.requestAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.requestAppender.File=${kafka.logs.dir}/kafka-request.log
+log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.appender.cleanerAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.cleanerAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.cleanerAppender.File=log-cleaner.log
+log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.controllerAppender.File=${kafka.logs.dir}/controller.log
+log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+# Turn on all our debugging info
+#log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG, kafkaAppender
+#log4j.logger.kafka.client.ClientUtils=DEBUG, kafkaAppender
+#log4j.logger.kafka.perf=DEBUG, kafkaAppender
+#log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG, kafkaAppender
+#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG
+log4j.logger.kafka=INFO, kafkaAppender
+
+log4j.logger.kafka.network.RequestChannel$=WARN, requestAppender
+log4j.additivity.kafka.network.RequestChannel$=false
+
+#log4j.logger.kafka.network.Processor=TRACE, requestAppender
+#log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender
+#log4j.additivity.kafka.server.KafkaApis=false
+log4j.logger.kafka.request.logger=WARN, requestAppender
+log4j.additivity.kafka.request.logger=false
+
+log4j.logger.kafka.controller=TRACE, controllerAppender
+log4j.additivity.kafka.controller=false
+
+log4j.logger.kafka.log.LogCleaner=INFO, cleanerAppender
+log4j.additivity.kafka.log.LogCleaner=false
+
+log4j.logger.state.change.logger=TRACE, stateChangeAppender
+log4j.additivity.state.change.logger=false
http://git-wip-us.apache.org/repos/asf/kafka/blob/bd784aeb/system_test/migration_tool_testsuite/config/migration_producer.properties
----------------------------------------------------------------------
diff --git a/system_test/migration_tool_testsuite/config/migration_producer.properties b/system_test/migration_tool_testsuite/config/migration_producer.properties
index 17b5928..7a2265a 100644
--- a/system_test/migration_tool_testsuite/config/migration_producer.properties
+++ b/system_test/migration_tool_testsuite/config/migration_producer.properties
@@ -37,6 +37,8 @@ metadata.broker.list=localhost:9094,localhost:9095,localhost:9096
# specifies whether the messages are sent asynchronously (async) or synchronously (sync)
producer.type=sync
+retry.backoff.ms=500
+
# specify the compression codec for all data generated: 0: no compression, 1: gzip
compression.codec=0
http://git-wip-us.apache.org/repos/asf/kafka/blob/bd784aeb/system_test/migration_tool_testsuite/migration_tool_test.py
----------------------------------------------------------------------
diff --git a/system_test/migration_tool_testsuite/migration_tool_test.py b/system_test/migration_tool_testsuite/migration_tool_test.py
index 2386a58..9594835 100644
--- a/system_test/migration_tool_testsuite/migration_tool_test.py
+++ b/system_test/migration_tool_testsuite/migration_tool_test.py
@@ -170,11 +170,6 @@ class MigrationToolTest(ReplicationUtils, SetupUtils):
self.anonLogger.info("sleeping for 5s")
time.sleep(5)
- self.log_message("creating topics")
- kafka_system_test_utils.create_topic_for_producer_performance(self.systemTestEnv, self.testcaseEnv)
- self.anonLogger.info("sleeping for 5s")
- time.sleep(5)
-
# =============================================
# starting producer
# =============================================
http://git-wip-us.apache.org/repos/asf/kafka/blob/bd784aeb/system_test/utils/kafka_system_test_utils.py
----------------------------------------------------------------------
diff --git a/system_test/utils/kafka_system_test_utils.py b/system_test/utils/kafka_system_test_utils.py
index 35f2d1b..3ab758a 100644
--- a/system_test/utils/kafka_system_test_utils.py
+++ b/system_test/utils/kafka_system_test_utils.py
@@ -734,7 +734,8 @@ def start_entity_in_background(systemTestEnv, testcaseEnv, entityId):
elif role == "broker":
cmdList = ["ssh " + hostname,
"'JAVA_HOME=" + javaHome,
- "JMX_PORT=" + jmxPort,
+ "JMX_PORT=" + jmxPort,
+ "KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%s/config/log4j.properties" % kafkaHome,
kafkaHome + "/bin/kafka-run-class.sh kafka.Kafka",
configPathName + "/" + configFile + " >> ",
logPathName + "/" + logFile + " & echo pid:$! > ",
@@ -975,10 +976,12 @@ def start_producer_performance(systemTestEnv, testcaseEnv, kafka07Client):
role = producerConfig["role"]
thread.start_new_thread(start_producer_in_thread, (testcaseEnv, entityConfigList, producerConfig, kafka07Client))
+ logger.debug("calling testcaseEnv.lock.acquire()", extra=d)
testcaseEnv.lock.acquire()
testcaseEnv.numProducerThreadsRunning += 1
logger.debug("testcaseEnv.numProducerThreadsRunning : " + str(testcaseEnv.numProducerThreadsRunning), extra=d)
time.sleep(1)
+ logger.debug("calling testcaseEnv.lock.release()", extra=d)
testcaseEnv.lock.release()
def generate_topics_string(topicPrefix, numOfTopics):
@@ -1119,7 +1122,7 @@ def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafk
"--metrics-dir " + metricsDir,
boolArgumentsStr,
" >> " + producerLogPathName,
- " & echo pid:$! > " + producerLogPath + "/entity_" + entityId + "_pid'"]
+ " & echo $! > " + producerLogPath + "/entity_" + entityId + "_pid'"]
if kafka07Client:
cmdList[:] = []
@@ -1150,17 +1153,19 @@ def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafk
"--message-size " + messageSize,
"--vary-message-size --async",
" >> " + producerLogPathName,
- " & echo pid:$! > " + producerLogPath + "/entity_" + entityId + "_pid'"]
+ " & echo $! > " + producerLogPath + "/entity_" + entityId + "_pid'"]
cmdStr = " ".join(cmdList)
logger.debug("executing command: [" + cmdStr + "]", extra=d)
subproc = system_test_utils.sys_call_return_subproc(cmdStr)
- for line in subproc.stdout.readlines():
- pass # dummy loop to wait until producer is completed
+ logger.debug("waiting for producer to finish", extra=d)
+ subproc.communicate()
+ logger.debug("producer finished", extra=d)
else:
testcaseEnv.numProducerThreadsRunning -= 1
logger.debug("testcaseEnv.numProducerThreadsRunning : " + str(testcaseEnv.numProducerThreadsRunning), extra=d)
+ logger.debug("calling testcaseEnv.lock.release()", extra=d)
testcaseEnv.lock.release()
break
@@ -1172,14 +1177,17 @@ def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafk
# wait until other producer threads also stops and
# let the main testcase know all producers have stopped
while 1:
+ logger.debug("calling testcaseEnv.lock.acquire()", extra=d)
testcaseEnv.lock.acquire()
time.sleep(1)
if testcaseEnv.numProducerThreadsRunning == 0:
testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"] = True
+ logger.debug("calling testcaseEnv.lock.release()", extra=d)
testcaseEnv.lock.release()
break
else:
logger.debug("waiting for TRUE of testcaseEnv.userDefinedEnvVarDict['backgroundProducerStopped']", extra=d)
+ logger.debug("calling testcaseEnv.lock.release()", extra=d)
testcaseEnv.lock.release()
time.sleep(1)
@@ -1617,8 +1625,10 @@ def stop_all_remote_running_processes(systemTestEnv, testcaseEnv):
# =============================================
# tell producer to stop
# =============================================
+ logger.debug("calling testcaseEnv.lock.acquire()", extra=d)
testcaseEnv.lock.acquire()
testcaseEnv.userDefinedEnvVarDict["stopBackgroundProducer"] = True
+ logger.debug("calling testcaseEnv.lock.release()", extra=d)
testcaseEnv.lock.release()
# =============================================
@@ -1626,13 +1636,16 @@ def stop_all_remote_running_processes(systemTestEnv, testcaseEnv):
# "backgroundProducerStopped" to be "True"
# =============================================
while 1:
+ logger.debug("calling testcaseEnv.lock.acquire()", extra=d)
testcaseEnv.lock.acquire()
logger.info("status of backgroundProducerStopped : [" + \
str(testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"]) + "]", extra=d)
if testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"]:
+ logger.debug("calling testcaseEnv.lock.release()", extra=d)
testcaseEnv.lock.release()
logger.info("all producer threads completed", extra=d)
break
+ logger.debug("calling testcaseEnv.lock.release()", extra=d)
testcaseEnv.lock.release()
testcaseEnv.producerHostParentPidDict.clear()