You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/04/01 20:40:13 UTC

git commit: kafka-1318; waiting for producer to stop is not reliable in system tests; patched by Jun Rao; reviewed by Guozhang Wang, Timothy Chen and Neha Narkhede

Repository: kafka
Updated Branches:
  refs/heads/trunk 083b6265c -> bd784aeb2


kafka-1318; waiting for producer to stop is not reliable in system tests; patched by Jun Rao; reviewed by Guozhang Wang, Timothy Chen and Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bd784aeb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bd784aeb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bd784aeb

Branch: refs/heads/trunk
Commit: bd784aeb20b6a4252675c681116b47ea86402130
Parents: 083b626
Author: Jun Rao <ju...@gmail.com>
Authored: Tue Apr 1 11:40:03 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Apr 1 11:40:03 2014 -0700

----------------------------------------------------------------------
 system_test/README.txt                          |  5 +-
 .../0.7/config/log4j.properties                 | 78 ++++++++++++++++++++
 .../config/migration_producer.properties        |  2 +
 .../migration_tool_test.py                      |  5 --
 system_test/utils/kafka_system_test_utils.py    | 23 ++++--
 5 files changed, 100 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/bd784aeb/system_test/README.txt
----------------------------------------------------------------------
diff --git a/system_test/README.txt b/system_test/README.txt
index 87937ec..f9972d1 100644
--- a/system_test/README.txt
+++ b/system_test/README.txt
@@ -54,9 +54,8 @@ The framework has the following levels:
   1. Update system_test/cluster_config.json for "kafka_home" & "java_home" specific to your environment
   2. Edit system_test/replication_testsuite/testcase_1/testcase_1_properties.json and update "broker-list" to the proper settings of your environment. (If this test is to be run in a single localhost, no change is required for this.)
   3. To run the test, go to <kafka_home>/system_test and run the following command:
-     $ python -B system_test_runner.py 
-  4. To turn on debugging, update system_test/system_test_runner.py and uncomment the following line:
-         namedLogger.setLevel(logging.DEBUG)
+     $ python -u -B system_test_runner.py 2>&1 | tee system_test_output.log
+  4. To turn on debugging, update system_test/logging.conf by changing the level in handlers session from INFO to DEBUG.
 
 # ==========================
 # Adding Test Case

http://git-wip-us.apache.org/repos/asf/kafka/blob/bd784aeb/system_test/migration_tool_testsuite/0.7/config/log4j.properties
----------------------------------------------------------------------
diff --git a/system_test/migration_tool_testsuite/0.7/config/log4j.properties b/system_test/migration_tool_testsuite/0.7/config/log4j.properties
new file mode 100644
index 0000000..baa698b
--- /dev/null
+++ b/system_test/migration_tool_testsuite/0.7/config/log4j.properties
@@ -0,0 +1,78 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+kafka.logs.dir=logs
+
+log4j.rootLogger=INFO, stdout 
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log
+log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.appender.stateChangeAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.stateChangeAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.stateChangeAppender.File=${kafka.logs.dir}/state-change.log
+log4j.appender.stateChangeAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.stateChangeAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.appender.requestAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.requestAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.requestAppender.File=${kafka.logs.dir}/kafka-request.log
+log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.appender.cleanerAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.cleanerAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.cleanerAppender.File=log-cleaner.log
+log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.appender.controllerAppender=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.controllerAppender.DatePattern='.'yyyy-MM-dd-HH
+log4j.appender.controllerAppender.File=${kafka.logs.dir}/controller.log
+log4j.appender.controllerAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.controllerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+# Turn on all our debugging info
+#log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG, kafkaAppender
+#log4j.logger.kafka.client.ClientUtils=DEBUG, kafkaAppender
+#log4j.logger.kafka.perf=DEBUG, kafkaAppender
+#log4j.logger.kafka.perf.ProducerPerformance$ProducerThread=DEBUG, kafkaAppender
+#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG
+log4j.logger.kafka=INFO, kafkaAppender
+
+log4j.logger.kafka.network.RequestChannel$=WARN, requestAppender
+log4j.additivity.kafka.network.RequestChannel$=false
+
+#log4j.logger.kafka.network.Processor=TRACE, requestAppender
+#log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender
+#log4j.additivity.kafka.server.KafkaApis=false
+log4j.logger.kafka.request.logger=WARN, requestAppender
+log4j.additivity.kafka.request.logger=false
+
+log4j.logger.kafka.controller=TRACE, controllerAppender
+log4j.additivity.kafka.controller=false
+
+log4j.logger.kafka.log.LogCleaner=INFO, cleanerAppender
+log4j.additivity.kafka.log.LogCleaner=false
+
+log4j.logger.state.change.logger=TRACE, stateChangeAppender
+log4j.additivity.state.change.logger=false

http://git-wip-us.apache.org/repos/asf/kafka/blob/bd784aeb/system_test/migration_tool_testsuite/config/migration_producer.properties
----------------------------------------------------------------------
diff --git a/system_test/migration_tool_testsuite/config/migration_producer.properties b/system_test/migration_tool_testsuite/config/migration_producer.properties
index 17b5928..7a2265a 100644
--- a/system_test/migration_tool_testsuite/config/migration_producer.properties
+++ b/system_test/migration_tool_testsuite/config/migration_producer.properties
@@ -37,6 +37,8 @@ metadata.broker.list=localhost:9094,localhost:9095,localhost:9096
 # specifies whether the messages are sent asynchronously (async) or synchronously (sync)
 producer.type=sync
 
+retry.backoff.ms=500
+
 # specify the compression codec for all data generated: 0: no compression, 1: gzip
 compression.codec=0
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/bd784aeb/system_test/migration_tool_testsuite/migration_tool_test.py
----------------------------------------------------------------------
diff --git a/system_test/migration_tool_testsuite/migration_tool_test.py b/system_test/migration_tool_testsuite/migration_tool_test.py
index 2386a58..9594835 100644
--- a/system_test/migration_tool_testsuite/migration_tool_test.py
+++ b/system_test/migration_tool_testsuite/migration_tool_test.py
@@ -170,11 +170,6 @@ class MigrationToolTest(ReplicationUtils, SetupUtils):
                 self.anonLogger.info("sleeping for 5s")
                 time.sleep(5)
 
-                self.log_message("creating topics")
-                kafka_system_test_utils.create_topic_for_producer_performance(self.systemTestEnv, self.testcaseEnv)
-                self.anonLogger.info("sleeping for 5s")
-                time.sleep(5)
-
                 # =============================================
                 # starting producer 
                 # =============================================

http://git-wip-us.apache.org/repos/asf/kafka/blob/bd784aeb/system_test/utils/kafka_system_test_utils.py
----------------------------------------------------------------------
diff --git a/system_test/utils/kafka_system_test_utils.py b/system_test/utils/kafka_system_test_utils.py
index 35f2d1b..3ab758a 100644
--- a/system_test/utils/kafka_system_test_utils.py
+++ b/system_test/utils/kafka_system_test_utils.py
@@ -734,7 +734,8 @@ def start_entity_in_background(systemTestEnv, testcaseEnv, entityId):
     elif role == "broker":
         cmdList = ["ssh " + hostname,
                   "'JAVA_HOME=" + javaHome,
-                 "JMX_PORT=" + jmxPort,
+                  "JMX_PORT=" + jmxPort,
+                  "KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%s/config/log4j.properties" % kafkaHome,
                   kafkaHome + "/bin/kafka-run-class.sh kafka.Kafka",
                   configPathName + "/" + configFile + " >> ",
                   logPathName + "/" + logFile + " & echo pid:$! > ",
@@ -975,10 +976,12 @@ def start_producer_performance(systemTestEnv, testcaseEnv, kafka07Client):
         role              = producerConfig["role"] 
 
         thread.start_new_thread(start_producer_in_thread, (testcaseEnv, entityConfigList, producerConfig, kafka07Client))
+        logger.debug("calling testcaseEnv.lock.acquire()", extra=d)
         testcaseEnv.lock.acquire()
         testcaseEnv.numProducerThreadsRunning += 1
         logger.debug("testcaseEnv.numProducerThreadsRunning : " + str(testcaseEnv.numProducerThreadsRunning), extra=d)
         time.sleep(1)
+        logger.debug("calling testcaseEnv.lock.release()", extra=d)
         testcaseEnv.lock.release()
 
 def generate_topics_string(topicPrefix, numOfTopics):
@@ -1119,7 +1122,7 @@ def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafk
                        "--metrics-dir " + metricsDir,
                        boolArgumentsStr,
                        " >> " + producerLogPathName,
-                       " & echo pid:$! > " + producerLogPath + "/entity_" + entityId + "_pid'"]
+                       " & echo $! > " + producerLogPath + "/entity_" + entityId + "_pid'"]
 
             if kafka07Client:
                 cmdList[:] = []
@@ -1150,17 +1153,19 @@ def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafk
                        "--message-size " + messageSize,
                        "--vary-message-size --async",
                        " >> " + producerLogPathName,
-                       " & echo pid:$! > " + producerLogPath + "/entity_" + entityId + "_pid'"]
+                       " & echo $! > " + producerLogPath + "/entity_" + entityId + "_pid'"]
 
             cmdStr = " ".join(cmdList)
             logger.debug("executing command: [" + cmdStr + "]", extra=d)
 
             subproc = system_test_utils.sys_call_return_subproc(cmdStr)
-            for line in subproc.stdout.readlines():
-                pass    # dummy loop to wait until producer is completed
+            logger.debug("waiting for producer to finish", extra=d)
+            subproc.communicate()
+            logger.debug("producer finished", extra=d)
         else:
             testcaseEnv.numProducerThreadsRunning -= 1
             logger.debug("testcaseEnv.numProducerThreadsRunning : " + str(testcaseEnv.numProducerThreadsRunning), extra=d)
+            logger.debug("calling testcaseEnv.lock.release()", extra=d)
             testcaseEnv.lock.release()
             break
 
@@ -1172,14 +1177,17 @@ def start_producer_in_thread(testcaseEnv, entityConfigList, producerConfig, kafk
     # wait until other producer threads also stops and
     # let the main testcase know all producers have stopped
     while 1:
+        logger.debug("calling testcaseEnv.lock.acquire()", extra=d)
         testcaseEnv.lock.acquire()
         time.sleep(1)
         if testcaseEnv.numProducerThreadsRunning == 0:
             testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"] = True
+            logger.debug("calling testcaseEnv.lock.release()", extra=d)
             testcaseEnv.lock.release()
             break
         else:
             logger.debug("waiting for TRUE of testcaseEnv.userDefinedEnvVarDict['backgroundProducerStopped']", extra=d)
+            logger.debug("calling testcaseEnv.lock.release()", extra=d)
             testcaseEnv.lock.release()
         time.sleep(1)
 
@@ -1617,8 +1625,10 @@ def stop_all_remote_running_processes(systemTestEnv, testcaseEnv):
         # =============================================
         # tell producer to stop
         # =============================================
+        logger.debug("calling testcaseEnv.lock.acquire()", extra=d)
         testcaseEnv.lock.acquire()
         testcaseEnv.userDefinedEnvVarDict["stopBackgroundProducer"] = True
+        logger.debug("calling testcaseEnv.lock.release()", extra=d)
         testcaseEnv.lock.release()
 
         # =============================================
@@ -1626,13 +1636,16 @@ def stop_all_remote_running_processes(systemTestEnv, testcaseEnv):
         # "backgroundProducerStopped" to be "True"
         # =============================================
         while 1:
+            logger.debug("calling testcaseEnv.lock.acquire()", extra=d)
             testcaseEnv.lock.acquire()
             logger.info("status of backgroundProducerStopped : [" + \
                 str(testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"]) + "]", extra=d)
             if testcaseEnv.userDefinedEnvVarDict["backgroundProducerStopped"]:
+                logger.debug("calling testcaseEnv.lock.release()", extra=d)
                 testcaseEnv.lock.release()
                 logger.info("all producer threads completed", extra=d)
                 break
+            logger.debug("calling testcaseEnv.lock.release()", extra=d)
             testcaseEnv.lock.release()
     
         testcaseEnv.producerHostParentPidDict.clear()