You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/07/01 20:55:30 UTC

[GitHub] [kafka] C0urante commented on a diff in pull request #11783: KAFKA-10000: System tests (KIP-618)

C0urante commented on code in PR #11783:
URL: https://github.com/apache/kafka/pull/11783#discussion_r912246197


##########
tests/kafkatest/tests/connect/connect_distributed_test.py:
##########
@@ -519,6 +532,91 @@ def test_bounce(self, clean, connect_protocol):
 
         assert success, "Found validation errors:\n" + "\n  ".join(errors)
 
+    @cluster(num_nodes=6)
+    @matrix(clean=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'])
+    def test_exactly_once_source(self, clean, connect_protocol):
+        """
+        Validates that source and sink tasks that run continuously and produce a predictable sequence of messages
+        run correctly and deliver messages exactly once when Kafka Connect workers undergo bounces, both clean and unclean.
+        """
+        num_tasks = 3
+
+        self.EXACTLY_ONCE_SOURCE_SUPPORT = 'enabled'
+        self.CONNECT_PROTOCOL = connect_protocol
+        self.setup_services()
+        self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
+        self.cc.start()
+
+        self.source = VerifiableSource(self.cc, topic=self.TOPIC, tasks=num_tasks, throughput=100, complete_records=True)
+        self.source.start()
+
+        for _ in range(3):
+            for node in self.cc.nodes:
+                started = time.time()
+                self.logger.info("%s bouncing Kafka Connect on %s", clean and "Clean" or "Hard", str(node.account))
+                self.cc.stop_node(node, clean_shutdown=clean)
+                with node.account.monitor_log(self.cc.LOG_FILE) as monitor:
+                    self.cc.start_node(node)
+                    monitor.wait_until("Starting connectors and tasks using config offset", timeout_sec=90,
+                                       err_msg="Kafka Connect worker didn't successfully join group and start work")
+                self.logger.info("Bounced Kafka Connect on %s and rejoined in %f seconds", node.account, time.time() - started)
+
+                # Give additional time for the consumer groups to recover. Even if it is not a hard bounce, there are
+                # some cases where a restart can cause a rebalance to take the full length of the session timeout
+                # (e.g. if the client shuts down before it has received the memberId from its initial JoinGroup).
+                # If we don't give enough time for the group to stabilize, the next bounce may cause consumers to
+                # be shut down before they have any time to process data and we can end up with zero data making it
+                # through the test.
+                time.sleep(15)

Review Comment:
   This is also copied from the [test_bounces](https://github.com/apache/kafka/blob/bad475166f0f0b27ec78d6ebe921b95749c4a3c1/tests/kafkatest/tests/connect/connect_distributed_test.py#L428-L434) case. It should probably be updated to not refer to consumer groups, but instead, worker rebalances and task startups.
   
   I guess if we want to be rigorous here, we could do some rolling bounces with this cushioning in place, and some without. Perhaps two rounds of each?



##########
tests/kafkatest/tests/connect/connect_distributed_test.py:
##########
@@ -519,6 +532,91 @@ def test_bounce(self, clean, connect_protocol):
 
         assert success, "Found validation errors:\n" + "\n  ".join(errors)
 
+    @cluster(num_nodes=6)
+    @matrix(clean=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'])
+    def test_exactly_once_source(self, clean, connect_protocol):
+        """
+        Validates that source and sink tasks that run continuously and produce a predictable sequence of messages
+        run correctly and deliver messages exactly once when Kafka Connect workers undergo bounces, both clean and unclean.
+        """
+        num_tasks = 3
+
+        self.EXACTLY_ONCE_SOURCE_SUPPORT = 'enabled'
+        self.CONNECT_PROTOCOL = connect_protocol
+        self.setup_services()
+        self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
+        self.cc.start()
+
+        self.source = VerifiableSource(self.cc, topic=self.TOPIC, tasks=num_tasks, throughput=100, complete_records=True)
+        self.source.start()
+
+        for _ in range(3):
+            for node in self.cc.nodes:

Review Comment:
   For context, this follows the same logic in the [test_bounces](https://github.com/apache/kafka/blob/bad475166f0f0b27ec78d6ebe921b95749c4a3c1/tests/kafkatest/tests/connect/connect_distributed_test.py#L417-L421) case. I guess we could do something a little less repetitive; I've offset the order by 1 with each successive restart.



##########
tests/kafkatest/tests/connect/connect_distributed_test.py:
##########
@@ -519,6 +532,91 @@ def test_bounce(self, clean, connect_protocol):
 
         assert success, "Found validation errors:\n" + "\n  ".join(errors)
 
+    @cluster(num_nodes=6)
+    @matrix(clean=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'])
+    def test_exactly_once_source(self, clean, connect_protocol):
+        """
+        Validates that source and sink tasks that run continuously and produce a predictable sequence of messages
+        run correctly and deliver messages exactly once when Kafka Connect workers undergo bounces, both clean and unclean.
+        """
+        num_tasks = 3
+
+        self.EXACTLY_ONCE_SOURCE_SUPPORT = 'enabled'
+        self.CONNECT_PROTOCOL = connect_protocol
+        self.setup_services()
+        self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
+        self.cc.start()
+
+        self.source = VerifiableSource(self.cc, topic=self.TOPIC, tasks=num_tasks, throughput=100, complete_records=True)
+        self.source.start()
+
+        for _ in range(3):
+            for node in self.cc.nodes:
+                started = time.time()
+                self.logger.info("%s bouncing Kafka Connect on %s", clean and "Clean" or "Hard", str(node.account))
+                self.cc.stop_node(node, clean_shutdown=clean)
+                with node.account.monitor_log(self.cc.LOG_FILE) as monitor:
+                    self.cc.start_node(node)
+                    monitor.wait_until("Starting connectors and tasks using config offset", timeout_sec=90,
+                                       err_msg="Kafka Connect worker didn't successfully join group and start work")
+                self.logger.info("Bounced Kafka Connect on %s and rejoined in %f seconds", node.account, time.time() - started)
+
+                # Give additional time for the consumer groups to recover. Even if it is not a hard bounce, there are
+                # some cases where a restart can cause a rebalance to take the full length of the session timeout
+                # (e.g. if the client shuts down before it has received the memberId from its initial JoinGroup).
+                # If we don't give enough time for the group to stabilize, the next bounce may cause consumers to
+                # be shut down before they have any time to process data and we can end up with zero data making it
+                # through the test.
+                time.sleep(15)
+
+        # Wait at least scheduled.rebalance.max.delay.ms to expire and rebalance
+        time.sleep(60)
+
+        # Allow the connectors to startup, recover, and exit cleanly before
+        # ending the test. It's possible for the source connector to make
+        # uncommitted progress, and for the sink connector to read messages that
+        # have not been committed yet, and fail a later assertion.
+        wait_until(lambda: self.is_running(self.source), timeout_sec=30,
+                   err_msg="Failed to see connector transition to the RUNNING state")
+        time.sleep(15)
+        self.source.stop()
+        self.cc.stop()
+
+        consumer = ConsoleConsumer(self.test_context, 1, self.kafka, self.source.topic, message_validator=json.loads, consumer_timeout_ms=1000, isolation_level="read_committed")
+        consumer.run()
+        src_messages = consumer.messages_consumed[1]
+
+        success = True
+        errors = []
+        for task in range(num_tasks):
+            # Validate source messages
+            src_seqnos = [msg['payload']['seqno'] for msg in src_messages if msg['payload']['task'] == task]
+            # Every seqno up to the largest one we ever saw should appear. Each seqno should only appear once because clean
+            # bouncing should commit on rebalance.
+            src_seqno_max = max(src_seqnos) if len(src_seqnos) else 0

Review Comment:
   Yeah, that's correct. Added a check for that case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org