You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/09/28 20:03:32 UTC

kafka git commit: KAFKA-5888; System test to check ordering of messages with transactions and max.in.flight > 1

Repository: kafka
Updated Branches:
  refs/heads/trunk 89ba0c152 -> dd6347a5d


KAFKA-5888; System test to check ordering of messages with transactions and max.in.flight > 1

To check ordering, we augment the existing transactions test to read and write from topics with one partition. Since we are writing monotonically increasing numbers, the topics should always be sorted, making it very easy to check for out of order messages.

Author: Apurva Mehta <ap...@confluent.io>

Reviewers: Jason Gustafson <ja...@confluent.io>

Closes #3969 from apurvam/KAFKA-5888-system-test-which-check-ordering


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dd6347a5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dd6347a5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dd6347a5

Branch: refs/heads/trunk
Commit: dd6347a5df5299efeeae27d5b196182058027502
Parents: 89ba0c1
Author: Apurva Mehta <ap...@confluent.io>
Authored: Thu Sep 28 13:03:07 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Thu Sep 28 13:03:07 2017 -0700

----------------------------------------------------------------------
 tests/kafkatest/tests/core/transactions_test.py | 124 +++++++++++--------
 1 file changed, 74 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/dd6347a5/tests/kafkatest/tests/core/transactions_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/core/transactions_test.py b/tests/kafkatest/tests/core/transactions_test.py
index d8f9e5c..0914844 100644
--- a/tests/kafkatest/tests/core/transactions_test.py
+++ b/tests/kafkatest/tests/core/transactions_test.py
@@ -47,53 +47,35 @@ class TransactionsTest(Test):
         self.num_output_partitions = 3
         self.num_seed_messages = 100000
         self.transaction_size = 750
-        self.first_transactional_id = "my-first-transactional-id"
-        self.second_transactional_id = "my-second-transactional-id"
         self.consumer_group = "transactions-test-consumer-group"
 
         self.zk = ZookeeperService(test_context, num_nodes=1)
         self.kafka = KafkaService(test_context,
                                   num_nodes=self.num_brokers,
-                                  zk=self.zk,
-                                  topics = {
-                                      self.input_topic: {
-                                          "partitions": self.num_input_partitions,
-                                          "replication-factor": 3,
-                                          "configs": {
-                                              "min.insync.replicas": 2
-                                          }
-                                      },
-                                      self.output_topic: {
-                                          "partitions": self.num_output_partitions,
-                                          "replication-factor": 3,
-                                          "configs": {
-                                              "min.insync.replicas": 2
-                                          }
-                                      }
-                                  })
+                                  zk=self.zk)
 
     def setUp(self):
         self.zk.start()
 
-    def seed_messages(self):
+    def seed_messages(self, topic, num_seed_messages):
         seed_timeout_sec = 10000
         seed_producer = VerifiableProducer(context=self.test_context,
                                            num_nodes=1,
                                            kafka=self.kafka,
-                                           topic=self.input_topic,
+                                           topic=topic,
                                            message_validator=is_int,
-                                           max_messages=self.num_seed_messages,
+                                           max_messages=num_seed_messages,
                                            enable_idempotence=True)
         seed_producer.start()
-        wait_until(lambda: seed_producer.num_acked >= self.num_seed_messages,
+        wait_until(lambda: seed_producer.num_acked >= num_seed_messages,
                    timeout_sec=seed_timeout_sec,
                    err_msg="Producer failed to produce messages %d in  %ds." %\
                    (self.num_seed_messages, seed_timeout_sec))
         return seed_producer.acked
 
-    def get_messages_from_output_topic(self):
-        consumer = self.start_consumer(self.output_topic, group_id="verifying_consumer")
-        return self.drain_consumer(consumer)
+    def get_messages_from_topic(self, topic, num_messages):
+        consumer = self.start_consumer(topic, group_id="verifying_consumer")
+        return self.drain_consumer(consumer, num_messages)
 
     def bounce_brokers(self, clean_shutdown):
        for node in self.kafka.nodes:
@@ -107,16 +89,16 @@ class TransactionsTest(Test):
                            hard-killed broker %s" % str(node.account))
                 self.kafka.start_node(node)
 
-    def create_and_start_message_copier(self, input_partition, transactional_id):
+    def create_and_start_message_copier(self, input_topic, input_partition, output_topic, transactional_id):
         message_copier = TransactionalMessageCopier(
             context=self.test_context,
             num_nodes=1,
             kafka=self.kafka,
             transactional_id=transactional_id,
             consumer_group=self.consumer_group,
-            input_topic=self.input_topic,
+            input_topic=input_topic,
             input_partition=input_partition,
-            output_topic=self.output_topic,
+            output_topic=output_topic,
             max_messages=-1,
             transaction_size=self.transaction_size
         )
@@ -137,16 +119,15 @@ class TransactionsTest(Test):
                                                         str(copier.progress_percent())))
                 copier.restart(clean_shutdown)
 
-    def create_and_start_copiers(self):
+    def create_and_start_copiers(self, input_topic, output_topic, num_copiers):
         copiers = []
-        copiers.append(self.create_and_start_message_copier(
-            input_partition=0,
-            transactional_id=self.first_transactional_id
-        ))
-        copiers.append(self.create_and_start_message_copier(
-            input_partition=1,
-            transactional_id=self.second_transactional_id
-        ))
+        for i in range(0, num_copiers):
+            copiers.append(self.create_and_start_message_copier(
+                input_topic=input_topic,
+                output_topic=output_topic,
+                input_partition=i,
+                transactional_id="copier-" + str(i)
+            ))
         return copiers
 
     def start_consumer(self, topic_to_read, group_id):
@@ -167,7 +148,7 @@ class TransactionsTest(Test):
                    60)
         return consumer
 
-    def drain_consumer(self, consumer):
+    def drain_consumer(self, consumer, num_messages):
         # wait until we read at least the expected number of messages.
         # This is a safe check because both failure modes will be caught:
         #  1. If we have 'num_seed_messages' but there are duplicates, then
@@ -175,14 +156,16 @@ class TransactionsTest(Test):
         #
         #  2. If we never reach 'num_seed_messages', then this will cause the
         #     test to fail.
-        wait_until(lambda: len(consumer.messages_consumed[1]) >= self.num_seed_messages,
+        wait_until(lambda: len(consumer.messages_consumed[1]) >= num_messages,
                    timeout_sec=90,
                    err_msg="Consumer consumed only %d out of %d messages in %ds" %\
-                   (len(consumer.messages_consumed[1]), self.num_seed_messages, 90))
+                   (len(consumer.messages_consumed[1]), num_messages, 90))
         consumer.stop()
         return consumer.messages_consumed[1]
 
-    def copy_messages_transactionally(self, failure_mode, bounce_target):
+    def copy_messages_transactionally(self, failure_mode, bounce_target,
+                                      input_topic, output_topic,
+                                      num_copiers, num_messages_to_copy):
         """Copies messages transactionally from the seeded input topic to the
         output topic, either bouncing brokers or clients in a hard and soft
         way as it goes.
@@ -192,8 +175,10 @@ class TransactionsTest(Test):
 
         It returns the concurrently consumed messages.
         """
-        copiers = self.create_and_start_copiers()
-        concurrent_consumer = self.start_consumer(self.output_topic,
+        copiers = self.create_and_start_copiers(input_topic=input_topic,
+                                                output_topic=output_topic,
+                                                num_copiers=num_copiers)
+        concurrent_consumer = self.start_consumer(output_topic,
                                                   group_id="concurrent_consumer")
         clean_shutdown = False
         if failure_mode == "clean_bounce":
@@ -210,22 +195,57 @@ class TransactionsTest(Test):
                        err_msg="%s - Failed to copy all messages in  %ds." %\
                        (copier.transactional_id, 120))
         self.logger.info("finished copying messages")
-        return self.drain_consumer(concurrent_consumer)
+
+        return self.drain_consumer(concurrent_consumer, num_messages_to_copy)
+
+    def setup_topics(self):
+        self.kafka.topics = {
+            self.input_topic: {
+                "partitions": self.num_input_partitions,
+                "replication-factor": 3,
+                "configs": {
+                    "min.insync.replicas": 2
+                }
+            },
+            self.output_topic: {
+                "partitions": self.num_output_partitions,
+                "replication-factor": 3,
+                "configs": {
+                    "min.insync.replicas": 2
+                }
+            }
+        }
 
     @cluster(num_nodes=9)
     @matrix(failure_mode=["hard_bounce", "clean_bounce"],
-            bounce_target=["brokers", "clients"])
-    def test_transactions(self, failure_mode, bounce_target):
+            bounce_target=["brokers", "clients"],
+            check_order=[True, False])
+    def test_transactions(self, failure_mode, bounce_target, check_order):
         security_protocol = 'PLAINTEXT'
         self.kafka.security_protocol = security_protocol
         self.kafka.interbroker_security_protocol = security_protocol
         self.kafka.logs["kafka_data_1"]["collect_default"] = True
         self.kafka.logs["kafka_data_2"]["collect_default"] = True
         self.kafka.logs["kafka_operational_logs_debug"]["collect_default"] = True
+        if check_order:
+            # To check ordering, we simply create input and output topics
+            # with a single partition.
+            # We reduce the number of seed messages to copy to account for the fewer output
+            # partitions, and thus lower parallelism. This helps keep the test
+            # time shorter.
+            self.num_seed_messages = self.num_seed_messages / 3
+            self.num_input_partitions = 1
+            self.num_output_partitions = 1
+
+        self.setup_topics()
         self.kafka.start()
-        input_messages = self.seed_messages()
-        concurrently_consumed_messages = self.copy_messages_transactionally(failure_mode, bounce_target)
-        output_messages = self.get_messages_from_output_topic()
+
+        input_messages = self.seed_messages(self.input_topic, self.num_seed_messages)
+        concurrently_consumed_messages = self.copy_messages_transactionally(
+            failure_mode, bounce_target, input_topic=self.input_topic,
+            output_topic=self.output_topic, num_copiers=self.num_input_partitions,
+            num_messages_to_copy=self.num_seed_messages)
+        output_messages = self.get_messages_from_topic(self.output_topic, self.num_seed_messages)
 
         concurrently_consumed_message_set = set(concurrently_consumed_messages)
         output_message_set = set(output_messages)
@@ -242,3 +262,7 @@ class TransactionsTest(Test):
         assert input_message_set == concurrently_consumed_message_set, \
             "Input and concurrently consumed output message sets are not equal. Num input messages: %d. Num concurrently_consumed_messages: %d" %\
             (len(input_message_set), len(concurrently_consumed_message_set))
+        if check_order:
+            assert input_messages == sorted(input_messages), "The seed messages themselves were not in order"
+            assert output_messages == input_messages, "Output messages are not in order"
+            assert concurrently_consumed_messages == output_messages, "Concurrently consumed messages are not in order"