You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/04/29 00:49:12 UTC

kafka git commit: KAFKA-3382: Add system test for ReplicationVerificationTool

Repository: kafka
Updated Branches:
  refs/heads/trunk 346df7273 -> 0ada3b1fc


KAFKA-3382: Add system test for ReplicationVerificationTool

Author: Ashish Singh <as...@cloudera.com>

Reviewers: Geoff Anderson <ge...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>

Closes #1160 from SinghAsDev/KAFKA-3382


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0ada3b1f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0ada3b1f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0ada3b1f

Branch: refs/heads/trunk
Commit: 0ada3b1fc215bb8efdf5c7ae27eb52b29e0fbbdc
Parents: 346df72
Author: Ashish Singh <as...@cloudera.com>
Authored: Thu Apr 28 15:49:01 2016 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Thu Apr 28 15:49:01 2016 -0700

----------------------------------------------------------------------
 .../services/replica_verification_tool.py       | 81 ++++++++++++++++++
 tests/kafkatest/services/verifiable_producer.py |  8 +-
 .../tests/tools/replica_verification_test.py    | 88 ++++++++++++++++++++
 3 files changed, 176 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0ada3b1f/tests/kafkatest/services/replica_verification_tool.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/replica_verification_tool.py b/tests/kafkatest/services/replica_verification_tool.py
new file mode 100644
index 0000000..f6374fb
--- /dev/null
+++ b/tests/kafkatest/services/replica_verification_tool.py
@@ -0,0 +1,81 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.services.background_thread import BackgroundThreadService
+
+from kafkatest.services.kafka.directory import kafka_dir
+from kafkatest.services.security.security_config import SecurityConfig
+
+import re
+
+class ReplicaVerificationTool(BackgroundThreadService):
+
+    logs = {
+        "producer_log": {
+            "path": "/mnt/replica_verification_tool.log",
+            "collect_default": False}
+    }
+
+    def __init__(self, context, num_nodes, kafka, topic, report_interval_ms, security_protocol="PLAINTEXT"):
+        super(ReplicaVerificationTool, self).__init__(context, num_nodes)
+
+        self.kafka = kafka
+        self.topic = topic
+        self.report_interval_ms = report_interval_ms
+        self.security_protocol = security_protocol
+        self.security_config = SecurityConfig(security_protocol)
+        self.partition_lag = {}
+
+    def _worker(self, idx, node):
+        cmd = self.start_cmd(node)
+        self.logger.debug("ReplicaVerificationTool %d command: %s" % (idx, cmd))
+        self.security_config.setup_node(node)
+        for line in node.account.ssh_capture(cmd):
+            self.logger.debug("Parsing line:{}".format(line))
+
+            parsed = re.search('.*max lag is (.+?) for partition \[(.+?)\] at', line)
+            if parsed:
+                lag = int(parsed.group(1))
+                topic_partition = parsed.group(2)
+                self.logger.debug("Setting max lag for {} as {}".format(topic_partition, lag))
+                self.partition_lag[topic_partition] = lag
+
+    def get_lag_for_partition(self, topic, partition):
+        """
+        Get latest lag for given topic-partition
+
+        Args:
+            topic:          a topic
+            partition:      a partition of the topic
+        """
+        topic_partition = topic + ',' + str(partition)
+        lag = self.partition_lag[topic_partition]
+        self.logger.debug("Retuning lag for {} as {}".format(topic_partition, lag))
+        return lag
+
+    def start_cmd(self, node):
+        cmd = "/opt/%s/bin/" % kafka_dir(node)
+        cmd += "kafka-run-class.sh kafka.tools.ReplicaVerificationTool"
+        cmd += " --broker-list %s --topic-white-list %s --time -2 --report-interval-ms %s" % (self.kafka.bootstrap_servers(self.security_protocol), self.topic, self.report_interval_ms)
+
+        cmd += " 2>> /mnt/replica_verification_tool.log | tee -a /mnt/replica_verification_tool.log &"
+        return cmd
+
+    def stop_node(self, node):
+        node.account.kill_process("java", clean_shutdown=True, allow_fail=True)
+
+    def clean_node(self, node):
+        node.account.kill_process("java", clean_shutdown=False, allow_fail=True)
+        node.account.ssh("rm -rf /mnt/replica_verification_tool.log", allow_fail=False)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/0ada3b1f/tests/kafkatest/services/verifiable_producer.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py
index 414da84..500410f 100644
--- a/tests/kafkatest/services/verifiable_producer.py
+++ b/tests/kafkatest/services/verifiable_producer.py
@@ -43,7 +43,7 @@ class VerifiableProducer(BackgroundThreadService):
         }
 
     def __init__(self, context, num_nodes, kafka, topic, max_messages=-1, throughput=100000,
-                 message_validator=is_int, compression_types=None, version=TRUNK):
+                 message_validator=is_int, compression_types=None, version=TRUNK, acks=None):
         """
         :param max_messages is a number of messages to be produced per producer
         :param message_validator checks for an expected format of messages produced. There are
@@ -71,6 +71,7 @@ class VerifiableProducer(BackgroundThreadService):
         self.acked_values = []
         self.not_acked_values = []
         self.produced_count = {}
+        self.acks = acks
 
 
     @property
@@ -96,6 +97,9 @@ class VerifiableProducer(BackgroundThreadService):
 
         # Create and upload config file
         producer_prop_file = self.prop_file(node)
+        if self.acks is not None:
+            self.logger.info("VerifiableProducer (index = %d) will use acks = %s", idx, self.acks)
+            producer_prop_file += "\nacks=%s\n" % self.acks
         self.logger.info("verifiable_producer.properties:")
         self.logger.info(producer_prop_file)
         node.account.create_file(VerifiableProducer.CONFIG_FILE, producer_prop_file)
@@ -156,6 +160,8 @@ class VerifiableProducer(BackgroundThreadService):
             cmd += " --throughput %s" % str(self.throughput)
         if self.message_validator == is_int_with_prefix:
             cmd += " --value-prefix %s" % str(idx)
+        if self.acks is not None:
+            cmd += " --acks %s\n" % str(self.acks)
 
         cmd += " --producer.config %s" % VerifiableProducer.CONFIG_FILE
         cmd += " 2>> %s | tee -a %s &" % (VerifiableProducer.STDOUT_CAPTURE, VerifiableProducer.STDOUT_CAPTURE)

http://git-wip-us.apache.org/repos/asf/kafka/blob/0ada3b1f/tests/kafkatest/tests/tools/replica_verification_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/tools/replica_verification_test.py b/tests/kafkatest/tests/tools/replica_verification_test.py
new file mode 100644
index 0000000..1b625e9
--- /dev/null
+++ b/tests/kafkatest/tests/tools/replica_verification_test.py
@@ -0,0 +1,88 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from ducktape.utils.util import wait_until
+from ducktape.tests.test import Test
+from kafkatest.services.verifiable_producer import VerifiableProducer
+
+from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.services.kafka import KafkaService
+from kafkatest.services.replica_verification_tool import ReplicaVerificationTool
+
+TOPIC = "topic-replica-verification"
+REPORT_INTERVAL_MS = 1000
+
+class ReplicaVerificationToolTest(Test):
+    """
+    Tests ReplicaVerificationTool
+    """
+    def __init__(self, test_context):
+        super(ReplicaVerificationToolTest, self).__init__(test_context)
+        self.num_zk = 1
+        self.num_brokers = 2
+        self.messages_received_count = 0
+        self.topics = {
+            TOPIC: {'partitions': 1, 'replication-factor': 2}
+        }
+
+        self.zk = ZookeeperService(test_context, self.num_zk)
+        self.kafka = None
+        self.producer = None
+        self.replica_verifier = None
+
+    def setUp(self):
+        self.zk.start()
+
+    def start_kafka(self, security_protocol, interbroker_security_protocol):
+        self.kafka = KafkaService(
+            self.test_context, self.num_brokers,
+            self.zk, security_protocol=security_protocol,
+            interbroker_security_protocol=interbroker_security_protocol, topics=self.topics)
+        self.kafka.start()
+
+    def start_replica_verification_tool(self, security_protocol):
+        self.replica_verifier = ReplicaVerificationTool(self.test_context, 1, self.kafka, TOPIC, report_interval_ms=REPORT_INTERVAL_MS, security_protocol=security_protocol)
+        self.replica_verifier.start()
+
+    def start_producer(self, max_messages, acks, timeout):
+        # This will produce to kafka cluster
+        self.producer = VerifiableProducer(self.test_context, num_nodes=1, kafka=self.kafka, topic=TOPIC, throughput=1000, acks=acks, max_messages=max_messages)
+        current_acked = self.producer.num_acked
+        self.logger.info("current_acked = %s" % current_acked)
+        self.producer.start()
+        wait_until(lambda: acks == 0 or self.producer.num_acked >= current_acked + max_messages, timeout_sec=timeout,
+                   err_msg="Timeout awaiting messages to be produced and acked")
+
+    def stop_producer(self):
+        self.producer.stop()
+
+    def test_replica_lags(self, security_protocol='PLAINTEXT'):
+        """
+        Tests ReplicaVerificationTool
+        :return: None
+        """
+        self.start_kafka(security_protocol, security_protocol)
+        self.start_replica_verification_tool(security_protocol)
+        self.start_producer(max_messages=10, acks=-1, timeout=15)
+        # Verify that there is no lag in replicas and is correctly reported by ReplicaVerificationTool
+        wait_until(lambda: self.replica_verifier.get_lag_for_partition(TOPIC, 0) == 0, timeout_sec=10,
+                   err_msg="Timed out waiting to reach zero replica lags.")
+        self.stop_producer()
+
+        self.start_producer(max_messages=1000, acks=0, timeout=5)
+        # Verify that there is lag in replicas and is correctly reported by ReplicaVerificationTool
+        wait_until(lambda: self.replica_verifier.get_lag_for_partition(TOPIC, 0) > 0, timeout_sec=10,
+                   err_msg="Timed out waiting to reach non-zero number of replica lags.")
\ No newline at end of file