You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/01/24 01:59:44 UTC

kafka git commit: KAFKA-2846: Add Ducktape test for kafka-consumer-groups

Repository: kafka
Updated Branches:
  refs/heads/trunk bc9237701 -> 4f39b5bc5


KAFKA-2846: Add Ducktape test for kafka-consumer-groups

Author: Ashish Singh <as...@cloudera.com>

Reviewers: Geoff Anderson <ge...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>

Closes #555 from SinghAsDev/KAFKA-2846


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4f39b5bc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4f39b5bc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4f39b5bc

Branch: refs/heads/trunk
Commit: 4f39b5bc5b2104fb39ab5b0c087fe84a71205a74
Parents: bc92377
Author: Ashish Singh <as...@cloudera.com>
Authored: Sat Jan 23 16:59:28 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Sat Jan 23 16:59:28 2016 -0800

----------------------------------------------------------------------
 tests/kafkatest/services/kafka/kafka.py         |  50 +++++++++
 .../tests/consumer_group_command_test.py        | 105 +++++++++++++++++++
 2 files changed, 155 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4f39b5bc/tests/kafkatest/services/kafka/kafka.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py
index cb5018c..b9105df 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -413,6 +413,56 @@ class KafkaService(JmxMixin, Service):
         self.logger.info("Leader for topic %s and partition %d is now: %d" % (topic, partition, leader_idx))
         return self.get_node(leader_idx)
 
+    def list_consumer_groups(self, node=None, new_consumer=False, command_config=None):
+        """ Get list of consumer groups.
+        """
+        if node is None:
+            node = self.nodes[0]
+
+        if command_config is None:
+            command_config = ""
+        else:
+            command_config = "--command-config " + command_config
+
+        if new_consumer:
+            cmd = "/opt/%s/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server %s %s --list" % \
+                  (kafka_dir(node), self.bootstrap_servers(self.security_protocol), command_config)
+        else:
+            cmd = "/opt/%s/bin/kafka-consumer-groups.sh --zookeeper %s %s --list" % \
+                  (kafka_dir(node), self.zk.connect_setting(), command_config)
+        output = ""
+        self.logger.debug(cmd)
+        for line in node.account.ssh_capture(cmd):
+            if not line.startswith("SLF4J"):
+                output += line
+        self.logger.debug(output)
+        return output
+
+    def describe_consumer_group(self, group, node=None, new_consumer=False, command_config=None):
+        """ Describe a consumer group.
+        """
+        if node is None:
+            node = self.nodes[0]
+
+        if command_config is None:
+            command_config = ""
+        else:
+            command_config = "--command-config " + command_config
+
+        if new_consumer:
+            cmd = "/opt/%s/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server %s %s --group %s --describe" % \
+                  (kafka_dir(node), self.bootstrap_servers(self.security_protocol), command_config, group)
+        else:
+            cmd = "/opt/%s/bin/kafka-consumer-groups.sh --zookeeper %s %s --group %s --describe" % \
+                  (kafka_dir(node), self.zk.connect_setting(), command_config, group)
+        output = ""
+        self.logger.debug(cmd)
+        for line in node.account.ssh_capture(cmd):
+            if not (line.startswith("SLF4J") or line.startswith("GROUP, TOPIC") or line.startswith("Could not fetch offset")):
+                output += line
+        self.logger.debug(output)
+        return output
+
     def bootstrap_servers(self, protocol='PLAINTEXT'):
         """Return comma-delimited list of brokers in this cluster formatted as HOSTNAME1:PORT1,HOSTNAME:PORT2,...
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4f39b5bc/tests/kafkatest/tests/consumer_group_command_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/consumer_group_command_test.py b/tests/kafkatest/tests/consumer_group_command_test.py
new file mode 100644
index 0000000..a7b43a1
--- /dev/null
+++ b/tests/kafkatest/tests/consumer_group_command_test.py
@@ -0,0 +1,105 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from ducktape.utils.util import wait_until
+from ducktape.tests.test import Test
+from ducktape.mark import matrix
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.console_consumer import ConsoleConsumer
+from kafkatest.services.security.security_config import SecurityConfig
+
+import os
+
+TOPIC = "topic-consumer-group-command"
+
+class ConsumerGroupCommandTest(Test):
+    """
+    Tests ConsumerGroupCommand
+    """
+    # Root directory for persistent output
+    PERSISTENT_ROOT = "/mnt/consumer_group_command"
+    COMMAND_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "command.properties")
+
+    def __init__(self, test_context):
+        super(ConsumerGroupCommandTest, self).__init__(test_context)
+        self.num_zk = 1
+        self.num_brokers = 1
+        self.topics = {
+            TOPIC: {'partitions': 1, 'replication-factor': 1}
+        }
+        self.zk = ZookeeperService(test_context, self.num_zk)
+
+    def setUp(self):
+        self.zk.start()
+
+    def start_kafka(self, security_protocol, interbroker_security_protocol):
+        self.kafka = KafkaService(
+            self.test_context, self.num_brokers,
+            self.zk, security_protocol=security_protocol,
+            interbroker_security_protocol=interbroker_security_protocol, topics=self.topics)
+        self.kafka.start()
+
+    def start_consumer(self, security_protocol):
+        enable_new_consumer = security_protocol == SecurityConfig.SSL
+        self.consumer = ConsoleConsumer(self.test_context, num_nodes=self.num_brokers, kafka=self.kafka, topic=TOPIC,
+                                        consumer_timeout_ms=None, new_consumer=enable_new_consumer)
+        self.consumer.start()
+
+    def setup_and_verify(self, security_protocol, group=None):
+        self.start_kafka(security_protocol, security_protocol)
+        self.start_consumer(security_protocol)
+        consumer_node = self.consumer.nodes[0]
+        wait_until(lambda: self.consumer.alive(consumer_node),
+                   timeout_sec=10, backoff_sec=.2, err_msg="Consumer was too slow to start")
+        kafka_node = self.kafka.nodes[0]
+        if security_protocol is not SecurityConfig.PLAINTEXT:
+            prop_file = str(self.kafka.security_config.client_config())
+            self.logger.debug(prop_file)
+            kafka_node.account.ssh("mkdir -p %s" % self.PERSISTENT_ROOT, allow_fail=False)
+            kafka_node.account.create_file(self.COMMAND_CONFIG_FILE, prop_file)
+
+        # Verify ConsumerGroupCommand lists expected consumer groups
+        enable_new_consumer = security_protocol != SecurityConfig.PLAINTEXT
+        command_config_file = None
+        if enable_new_consumer:
+            command_config_file = self.COMMAND_CONFIG_FILE
+
+        if group:
+            wait_until(lambda: ("%s, topic-consumer-group-command, 0," % group) in self.kafka.describe_consumer_group(group=group, node=kafka_node, new_consumer=enable_new_consumer, command_config=command_config_file), timeout_sec=10,
+                       err_msg="Timed out waiting to list expected consumer groups.")
+        else:
+            wait_until(lambda: "test-consumer-group" in self.kafka.list_consumer_groups(node=kafka_node, new_consumer=enable_new_consumer, command_config=command_config_file), timeout_sec=10,
+                       err_msg="Timed out waiting to list expected consumer groups.")
+
+        self.consumer.stop()
+
+    @matrix(security_protocol=['PLAINTEXT', 'SSL'])
+    def test_list_consumer_groups(self, security_protocol='PLAINTEXT'):
+        """
+        Tests if ConsumerGroupCommand is listing correct consumer groups
+        :return: None
+        """
+        self.setup_and_verify(security_protocol)
+
+    @matrix(security_protocol=['PLAINTEXT', 'SSL'])
+    def test_describe_consumer_group(self, security_protocol='PLAINTEXT'):
+        """
+        Tests if ConsumerGroupCommand is describing a consumer group correctly
+        :return: None
+        """
+        self.setup_and_verify(security_protocol, group="test-consumer-group")