You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/07/01 08:55:20 UTC

[GitHub] [kafka] tombentley commented on a diff in pull request #11783: KAFKA-10000: System tests (KIP-618)

tombentley commented on code in PR #11783:
URL: https://github.com/apache/kafka/pull/11783#discussion_r911757853


##########
tests/kafkatest/tests/connect/connect_distributed_test.py:
##########
@@ -519,6 +532,91 @@ def test_bounce(self, clean, connect_protocol):
 
         assert success, "Found validation errors:\n" + "\n  ".join(errors)
 
+    @cluster(num_nodes=6)
+    @matrix(clean=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'])
+    def test_exactly_once_source(self, clean, connect_protocol):
+        """
+        Validates that source and sink tasks that run continuously and produce a predictable sequence of messages
+        run correctly and deliver messages exactly once when Kafka Connect workers undergo bounces, both clean and unclean.
+        """
+        num_tasks = 3
+
+        self.EXACTLY_ONCE_SOURCE_SUPPORT = 'enabled'
+        self.CONNECT_PROTOCOL = connect_protocol
+        self.setup_services()
+        self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
+        self.cc.start()
+
+        self.source = VerifiableSource(self.cc, topic=self.TOPIC, tasks=num_tasks, throughput=100, complete_records=True)
+        self.source.start()
+
+        for _ in range(3):
+            for node in self.cc.nodes:

Review Comment:
   I wonder if we really should restart the nodes in the same order each time.



##########
tests/kafkatest/tests/connect/connect_distributed_test.py:
##########
@@ -519,6 +532,91 @@ def test_bounce(self, clean, connect_protocol):
 
         assert success, "Found validation errors:\n" + "\n  ".join(errors)
 
+    @cluster(num_nodes=6)
+    @matrix(clean=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'])
+    def test_exactly_once_source(self, clean, connect_protocol):
+        """
+        Validates that source and sink tasks that run continuously and produce a predictable sequence of messages
+        run correctly and deliver messages exactly once when Kafka Connect workers undergo bounces, both clean and unclean.
+        """
+        num_tasks = 3
+
+        self.EXACTLY_ONCE_SOURCE_SUPPORT = 'enabled'
+        self.CONNECT_PROTOCOL = connect_protocol
+        self.setup_services()
+        self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
+        self.cc.start()
+
+        self.source = VerifiableSource(self.cc, topic=self.TOPIC, tasks=num_tasks, throughput=100, complete_records=True)
+        self.source.start()
+
+        for _ in range(3):
+            for node in self.cc.nodes:
+                started = time.time()
+                self.logger.info("%s bouncing Kafka Connect on %s", clean and "Clean" or "Hard", str(node.account))
+                self.cc.stop_node(node, clean_shutdown=clean)
+                with node.account.monitor_log(self.cc.LOG_FILE) as monitor:
+                    self.cc.start_node(node)
+                    monitor.wait_until("Starting connectors and tasks using config offset", timeout_sec=90,
+                                       err_msg="Kafka Connect worker didn't successfully join group and start work")
+                self.logger.info("Bounced Kafka Connect on %s and rejoined in %f seconds", node.account, time.time() - started)
+
+                # Give additional time for the consumer groups to recover. Even if it is not a hard bounce, there are
+                # some cases where a restart can cause a rebalance to take the full length of the session timeout
+                # (e.g. if the client shuts down before it has received the memberId from its initial JoinGroup).
+                # If we don't give enough time for the group to stabilize, the next bounce may cause consumers to
+                # be shut down before they have any time to process data and we can end up with zero data making it
+                # through the test.
+                time.sleep(15)

Review Comment:
   Agreed that no data making it through the test is something to avoid, but I think always giving the consumer groups time to recover is a lenient way of achieving that. Perhaps the 2nd restart should not have the timeout, precisely to ensure that the messy case still doesn't cause problems. 



##########
tests/kafkatest/tests/connect/connect_distributed_test.py:
##########
@@ -519,6 +532,91 @@ def test_bounce(self, clean, connect_protocol):
 
         assert success, "Found validation errors:\n" + "\n  ".join(errors)
 
+    @cluster(num_nodes=6)
+    @matrix(clean=[True, False], connect_protocol=['sessioned', 'compatible', 'eager'])
+    def test_exactly_once_source(self, clean, connect_protocol):
+        """
+        Validates that source and sink tasks that run continuously and produce a predictable sequence of messages
+        run correctly and deliver messages exactly once when Kafka Connect workers undergo bounces, both clean and unclean.
+        """
+        num_tasks = 3
+
+        self.EXACTLY_ONCE_SOURCE_SUPPORT = 'enabled'
+        self.CONNECT_PROTOCOL = connect_protocol
+        self.setup_services()
+        self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))
+        self.cc.start()
+
+        self.source = VerifiableSource(self.cc, topic=self.TOPIC, tasks=num_tasks, throughput=100, complete_records=True)
+        self.source.start()
+
+        for _ in range(3):
+            for node in self.cc.nodes:
+                started = time.time()
+                self.logger.info("%s bouncing Kafka Connect on %s", clean and "Clean" or "Hard", str(node.account))
+                self.cc.stop_node(node, clean_shutdown=clean)
+                with node.account.monitor_log(self.cc.LOG_FILE) as monitor:
+                    self.cc.start_node(node)
+                    monitor.wait_until("Starting connectors and tasks using config offset", timeout_sec=90,
+                                       err_msg="Kafka Connect worker didn't successfully join group and start work")
+                self.logger.info("Bounced Kafka Connect on %s and rejoined in %f seconds", node.account, time.time() - started)
+
+                # Give additional time for the consumer groups to recover. Even if it is not a hard bounce, there are
+                # some cases where a restart can cause a rebalance to take the full length of the session timeout
+                # (e.g. if the client shuts down before it has received the memberId from its initial JoinGroup).
+                # If we don't give enough time for the group to stabilize, the next bounce may cause consumers to
+                # be shut down before they have any time to process data and we can end up with zero data making it
+                # through the test.
+                time.sleep(15)
+
+        # Wait at least scheduled.rebalance.max.delay.ms to expire and rebalance
+        time.sleep(60)
+
+        # Allow the connectors to startup, recover, and exit cleanly before
+        # ending the test. It's possible for the source connector to make
+        # uncommitted progress, and for the sink connector to read messages that
+        # have not been committed yet, and fail a later assertion.
+        wait_until(lambda: self.is_running(self.source), timeout_sec=30,
+                   err_msg="Failed to see connector transition to the RUNNING state")
+        time.sleep(15)
+        self.source.stop()
+        self.cc.stop()
+
+        consumer = ConsoleConsumer(self.test_context, 1, self.kafka, self.source.topic, message_validator=json.loads, consumer_timeout_ms=1000, isolation_level="read_committed")
+        consumer.run()
+        src_messages = consumer.messages_consumed[1]
+
+        success = True
+        errors = []
+        for task in range(num_tasks):
+            # Validate source messages
+            src_seqnos = [msg['payload']['seqno'] for msg in src_messages if msg['payload']['task'] == task]
+            # Every seqno up to the largest one we ever saw should appear. Each seqno should only appear once because clean
+            # bouncing should commit on rebalance.
+            src_seqno_max = max(src_seqnos) if len(src_seqnos) else 0

Review Comment:
   If `src_seqno_max==0` then we've not really proven that EOS is actually able to make progress in the presence of worker restarts, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org