You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/08/12 00:23:35 UTC

kafka git commit: KAFKA-2408: ConsoleConsumerService direct log output to file

Repository: kafka
Updated Branches:
  refs/heads/trunk 96534a7d5 -> be633a713


KAFKA-2408: ConsoleConsumerService direct log output to file

console consumer writes to System.out, while (some) log4j loggers operate in other threads.

This occasionally led to funky interleaved output which disrupted parsing of consumed messages by ConsoleConsumerService, leading to spurious test failures.

This fix directs log output to a separate file.

Author: Geoff Anderson <ge...@confluent.io>

Reviewers: Ewen Cheslack-Postava

Closes #123 from granders/KAFKA-2408 and squashes the following commits:

247b0e0 [Geoff Anderson] Updated line counting to use wc -l
66d6f4f [Geoff Anderson] lower -> uperrcase constants
e67f554 [Geoff Anderson] Changed incorrect license header
af67e01 [Geoff Anderson] Merged in upstream trunk
8f89044 [Geoff Anderson] Added another lifecycle check. Wait for log file to exist before exmaning contents.
521a84b [Geoff Anderson] Updated console consumer to directo log output directly to file rather than stdout


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/be633a71
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/be633a71
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/be633a71

Branch: refs/heads/trunk
Commit: be633a713e311c90ebfafc650eb3dcfb94ce372d
Parents: 96534a7
Author: Geoff Anderson <ge...@confluent.io>
Authored: Tue Aug 11 15:24:52 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Aug 11 15:24:52 2015 -0700

----------------------------------------------------------------------
 tests/kafkatest/sanity_checks/__init__.py       | 14 ++++
 .../sanity_checks/test_console_consumer.py      | 80 ++++++++++++++++++++
 tests/kafkatest/services/console_consumer.py    | 61 ++++++++++++---
 .../templates/console_consumer.properties       |  2 +-
 .../templates/console_consumer_log4j.properties | 26 +++++++
 5 files changed, 171 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/be633a71/tests/kafkatest/sanity_checks/__init__.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/sanity_checks/__init__.py b/tests/kafkatest/sanity_checks/__init__.py
new file mode 100644
index 0000000..91eacc9
--- /dev/null
+++ b/tests/kafkatest/sanity_checks/__init__.py
@@ -0,0 +1,14 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/be633a71/tests/kafkatest/sanity_checks/test_console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/sanity_checks/test_console_consumer.py b/tests/kafkatest/sanity_checks/test_console_consumer.py
new file mode 100644
index 0000000..cd8c8f9
--- /dev/null
+++ b/tests/kafkatest/sanity_checks/test_console_consumer.py
@@ -0,0 +1,80 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.tests.test import Test
+from ducktape.utils.util import wait_until
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.console_consumer import ConsoleConsumer
+
+import time
+
+
+def file_exists(node, file):
+    """Quick and dirty check for existence of remote file."""
+    try:
+        node.account.ssh("cat " + file, allow_fail=False)
+        return True
+    except:
+        return False
+
+
+def line_count(node, file):
+    """Return the line count of file on node"""
+    out = [line for line in node.account.ssh_capture("wc -l %s" % file)]
+    if len(out) != 1:
+        raise Exception("Expected single line of output from wc -l")
+
+    return int(out[0].strip().split(" ")[0])
+
+
+class ConsoleConsumerTest(Test):
+    """Sanity checks on console consumer service class."""
+    def __init__(self, test_context):
+        super(ConsoleConsumerTest, self).__init__(test_context)
+
+        self.topic = "topic"
+        self.zk = ZookeeperService(test_context, num_nodes=1)
+        self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk,
+                                  topics={self.topic: {"partitions": 1, "replication-factor": 1}})
+        self.consumer = ConsoleConsumer(test_context, num_nodes=1, kafka=self.kafka, topic=self.topic)
+
+    def setUp(self):
+        self.zk.start()
+        self.kafka.start()
+
+    def test_lifecycle(self):
+        t0 = time.time()
+        self.consumer.start()
+        node = self.consumer.nodes[0]
+
+        if not wait_until(lambda: self.consumer.alive(node), timeout_sec=10, backoff_sec=.2):
+            raise Exception("Consumer was too slow to start")
+        self.logger.info("consumer started in %s seconds " % str(time.time() - t0))
+
+        # Verify that log output is happening
+        if not wait_until(lambda: file_exists(node, ConsoleConsumer.LOG_FILE), timeout_sec=10):
+            raise Exception("Timed out waiting for log file to exist")
+        assert line_count(node, ConsoleConsumer.LOG_FILE) > 0
+
+        # Verify no consumed messages
+        assert line_count(node, ConsoleConsumer.STDOUT_CAPTURE) == 0
+
+        self.consumer.stop_node(node)
+        if not wait_until(lambda: not self.consumer.alive(node), timeout_sec=10, backoff_sec=.2):
+            raise Exception("Took too long for consumer to die.")
+
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/be633a71/tests/kafkatest/services/console_consumer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py
index 33ef4ea..18c9f63 100644
--- a/tests/kafkatest/services/console_consumer.py
+++ b/tests/kafkatest/services/console_consumer.py
@@ -15,6 +15,8 @@
 
 from ducktape.services.background_thread import BackgroundThreadService
 
+import os
+
 
 def is_int(msg):
     """Default method used to check whether text pulled from console consumer is a message.
@@ -26,7 +28,6 @@ def is_int(msg):
     except:
         return None
 
-
 """
 0.8.2.1 ConsoleConsumer options
 
@@ -69,9 +70,24 @@ Option                                  Description
 
 
 class ConsoleConsumer(BackgroundThreadService):
+    # Root directory for persistent output
+    PERSISTENT_ROOT = "/mnt/console_consumer"
+    STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "console_consumer.stdout")
+    STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT, "console_consumer.stderr")
+    LOG_DIR = os.path.join(PERSISTENT_ROOT, "logs")
+    LOG_FILE = os.path.join(LOG_DIR, "console_consumer.log")
+    LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")
+    CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "console_consumer.properties")
+
     logs = {
+        "consumer_stdout": {
+            "path": STDOUT_CAPTURE,
+            "collect_default": False},
+        "consumer_stderr": {
+            "path": STDERR_CAPTURE,
+            "collect_default": False},
         "consumer_log": {
-            "path": "/mnt/consumer.log",
+            "path": LOG_FILE,
             "collect_default": True}
         }
 
@@ -104,18 +120,37 @@ class ConsoleConsumer(BackgroundThreadService):
     @property
     def start_cmd(self):
         args = self.args.copy()
-        args.update({'zk_connect': self.kafka.zk.connect_setting()})
-        cmd = "/opt/kafka/bin/kafka-console-consumer.sh --topic %(topic)s --zookeeper %(zk_connect)s" \
-              " --consumer.config /mnt/console_consumer.properties" % args
+        args['zk_connect'] = self.kafka.zk.connect_setting()
+        args['stdout'] = ConsoleConsumer.STDOUT_CAPTURE
+        args['stderr'] = ConsoleConsumer.STDERR_CAPTURE
+        args['config_file'] = ConsoleConsumer.CONFIG_FILE
+
+        cmd = "export LOG_DIR=%s;" % ConsoleConsumer.LOG_DIR
+        cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\";" % ConsoleConsumer.LOG4J_CONFIG
+        cmd += " /opt/kafka/bin/kafka-console-consumer.sh --topic %(topic)s --zookeeper %(zk_connect)s" \
+            " --consumer.config %(config_file)s" % args
 
         if self.from_beginning:
             cmd += " --from-beginning"
 
-        cmd += " 2>> /mnt/consumer.log | tee -a /mnt/consumer.log &"
+        cmd += " 2>> %(stderr)s | tee -a %(stdout)s &" % args
         return cmd
 
+    def pids(self, node):
+        try:
+            cmd = "ps ax | grep -i console_consumer | grep java | grep -v grep | awk '{print $1}'"
+            pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
+            return pid_arr
+        except:
+            return []
+
+    def alive(self, node):
+        return len(self.pids(node)) > 0
+
     def _worker(self, idx, node):
-        # form config file
+        node.account.ssh("mkdir -p %s" % ConsoleConsumer.PERSISTENT_ROOT, allow_fail=False)
+
+        # Create and upload config file
         if self.consumer_timeout_ms is not None:
             prop_file = self.render('console_consumer.properties', consumer_timeout_ms=self.consumer_timeout_ms)
         else:
@@ -123,12 +158,16 @@ class ConsoleConsumer(BackgroundThreadService):
 
         self.logger.info("console_consumer.properties:")
         self.logger.info(prop_file)
-        node.account.create_file("/mnt/console_consumer.properties", prop_file)
+        node.account.create_file(ConsoleConsumer.CONFIG_FILE, prop_file)
+
+        # Create and upload log properties
+        log_config = self.render('console_consumer_log4j.properties', log_file=ConsoleConsumer.LOG_FILE)
+        node.account.create_file(ConsoleConsumer.LOG4J_CONFIG, log_config)
 
         # Run and capture output
         cmd = self.start_cmd
         self.logger.debug("Console consumer %d command: %s", idx, cmd)
-        for line in node.account.ssh_capture(cmd):
+        for line in node.account.ssh_capture(cmd, allow_fail=False):
             msg = line.strip()
             msg = self.message_validator(msg)
             if msg is not None:
@@ -139,8 +178,8 @@ class ConsoleConsumer(BackgroundThreadService):
         super(ConsoleConsumer, self).start_node(node)
 
     def stop_node(self, node):
-        node.account.kill_process("java", allow_fail=False)
+        node.account.kill_process("java", allow_fail=True)
 
     def clean_node(self, node):
-        node.account.ssh("rm -rf /mnt/console_consumer.properties /mnt/consumer.log", allow_fail=False)
+        node.account.ssh("rm -rf %s" % ConsoleConsumer.PERSISTENT_ROOT, allow_fail=False)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/be633a71/tests/kafkatest/services/templates/console_consumer.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/templates/console_consumer.properties b/tests/kafkatest/services/templates/console_consumer.properties
index 63782fc..944c2c9 100644
--- a/tests/kafkatest/services/templates/console_consumer.properties
+++ b/tests/kafkatest/services/templates/console_consumer.properties
@@ -14,6 +14,6 @@
 # limitations under the License.
 # see kafka.server.KafkaConfig for additional details and defaults
 
-{% if consumer_timeout_ms is defined %}
+{% if consumer_timeout_ms is not none %}
 consumer.timeout.ms={{ consumer_timeout_ms }}
 {% endif %}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/be633a71/tests/kafkatest/services/templates/console_consumer_log4j.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/templates/console_consumer_log4j.properties b/tests/kafkatest/services/templates/console_consumer_log4j.properties
new file mode 100644
index 0000000..e63e6d6
--- /dev/null
+++ b/tests/kafkatest/services/templates/console_consumer_log4j.properties
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Define the root logger with appender file
+log4j.rootLogger = INFO, FILE
+
+log4j.appender.FILE=org.apache.log4j.FileAppender
+log4j.appender.FILE.File={{ log_file }}
+log4j.appender.FILE.ImmediateFlush=true
+log4j.appender.FILE.Threshold=debug
+# Set the append to false, overwrite
+log4j.appender.FILE.Append=false
+log4j.appender.FILE.layout=org.apache.log4j.PatternLayout
+log4j.appender.FILE.layout.conversionPattern=[%d] %p %m (%c)%n
\ No newline at end of file