You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/07/13 00:13:59 UTC

[GitHub] [kafka] mjsax commented on a change in pull request #10944: MINOR: Loose verification of startup in EOS system tests

mjsax commented on a change in pull request #10944:
URL: https://github.com/apache/kafka/pull/10944#discussion_r668337811



##########
File path: tests/kafkatest/tests/streams/streams_eos_test.py
##########
@@ -128,45 +128,61 @@ def run_failure_and_recovery(self, processor1, processor2, processor3, verifier)
         verifier.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % verifier.STDOUT_FILE, allow_fail=False)
 
     def add_streams(self, processor):
-        with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor:
-            processor.start()
-            self.wait_for_startup(monitor, processor)
+        with processor.node.account.monitor_log(processor.LOG_FILE) as log_monitor:
+            with processor.node.account.monitor_log(processor.STDOUT_FILE) as stdout_monitor:
+                processor.start()
+                self.wait_for_running(stdout_monitor, processor)
+                self.wait_for_commit(log_monitor, processor)
 
     def add_streams2(self, running_processor, processor_to_be_started):
-        with running_processor.node.account.monitor_log(running_processor.STDOUT_FILE) as monitor:
-            self.add_streams(processor_to_be_started)
-            self.wait_for_startup(monitor, running_processor)
+        with running_processor.node.account.monitor_log(running_processor.LOG_FILE) as log_monitor:
+            with running_processor.node.account.monitor_log(running_processor.STDOUT_FILE) as stdout_monitor:
+                self.add_streams(processor_to_be_started)
+                self.wait_for_running(stdout_monitor, running_processor)
+                self.wait_for_commit(log_monitor, running_processor)
 
     def add_streams3(self, running_processor1, running_processor2, processor_to_be_started):
-        with running_processor1.node.account.monitor_log(running_processor1.STDOUT_FILE) as monitor:
-            self.add_streams2(running_processor2, processor_to_be_started)
-            self.wait_for_startup(monitor, running_processor1)
+        with running_processor1.node.account.monitor_log(running_processor1.LOG_FILE) as log_monitor:
+            with running_processor1.node.account.monitor_log(running_processor1.STDOUT_FILE) as stdout_monitor:
+                self.add_streams2(running_processor2, processor_to_be_started)
+                self.wait_for_running(stdout_monitor, running_processor1)
+                self.wait_for_commit(log_monitor, running_processor1)
 
     def stop_streams(self, processor_to_be_stopped):
         with processor_to_be_stopped.node.account.monitor_log(processor_to_be_stopped.STDOUT_FILE) as monitor2:
             processor_to_be_stopped.stop()
             self.wait_for(monitor2, processor_to_be_stopped, "StateChange: PENDING_SHUTDOWN -> NOT_RUNNING")
 
     def stop_streams2(self, keep_alive_processor, processor_to_be_stopped):
-        with keep_alive_processor.node.account.monitor_log(keep_alive_processor.STDOUT_FILE) as monitor:
-            self.stop_streams(processor_to_be_stopped)
-            self.wait_for_startup(monitor, keep_alive_processor)
+        with keep_alive_processor.node.account.monitor_log(keep_alive_processor.LOG_FILE) as log_monitor:
+            with keep_alive_processor.node.account.monitor_log(keep_alive_processor.STDOUT_FILE) as stdout_monitor:
+                self.stop_streams(processor_to_be_stopped)
+                self.wait_for_running(stdout_monitor, keep_alive_processor)
+                self.wait_for_commit(log_monitor, keep_alive_processor)
 
     def stop_streams3(self, keep_alive_processor1, keep_alive_processor2, processor_to_be_stopped):
-        with keep_alive_processor1.node.account.monitor_log(keep_alive_processor1.STDOUT_FILE) as monitor:
-            self.stop_streams2(keep_alive_processor2, processor_to_be_stopped)
-            self.wait_for_startup(monitor, keep_alive_processor1)
+        with keep_alive_processor1.node.account.monitor_log(keep_alive_processor1.LOG_FILE) as log_monitor:
+            with keep_alive_processor1.node.account.monitor_log(keep_alive_processor1.STDOUT_FILE) as stdout_monitor:
+                self.stop_streams2(keep_alive_processor2, processor_to_be_stopped)
+                self.wait_for_running(stdout_monitor, keep_alive_processor1)
+                self.wait_for_commit(log_monitor, keep_alive_processor1)
 
     def abort_streams(self, keep_alive_processor1, keep_alive_processor2, processor_to_be_aborted):
-        with keep_alive_processor1.node.account.monitor_log(keep_alive_processor1.STDOUT_FILE) as monitor1:
-            with keep_alive_processor2.node.account.monitor_log(keep_alive_processor2.STDOUT_FILE) as monitor2:
-                processor_to_be_aborted.stop_nodes(False)
-            self.wait_for_startup(monitor2, keep_alive_processor2)
-        self.wait_for_startup(monitor1, keep_alive_processor1)
-
-    def wait_for_startup(self, monitor, processor):
+        with keep_alive_processor1.node.account.monitor_log(keep_alive_processor1.LOG_FILE) as log_monitor1:
+            with keep_alive_processor1.node.account.monitor_log(keep_alive_processor1.STDOUT_FILE) as stdout_monitor1:
+                with keep_alive_processor2.node.account.monitor_log(keep_alive_processor1.LOG_FILE) as log_monitor2:
+                    with keep_alive_processor2.node.account.monitor_log(keep_alive_processor1.STDOUT_FILE) as stdout_monitor2:
+                        processor_to_be_aborted.stop_nodes(False)
+                        self.wait_for_running(stdout_monitor2, keep_alive_processor2)
+                        self.wait_for_running(stdout_monitor1, keep_alive_processor1)
+                        self.wait_for_commit(log_monitor2, keep_alive_processor2)
+                        self.wait_for_commit(log_monitor1, keep_alive_processor1)
+
+    def wait_for_running(self, monitor, processor):
         self.wait_for(monitor, processor, "StateChange: REBALANCING -> RUNNING")
-        self.wait_for(monitor, processor, "processed [0-9]* records from topic")
+
+    def wait_for_commit(self, monitor, processor):
+        self.wait_for(monitor, processor, "Committed all active tasks \[[0-9_,]+\] and standby tasks \[[0-9_,]+\]")

Review comment:
       Is this also logged if there is nothing to be committed? Just double checking to ensure we don't introduce a new issue -- we only commit, if we have anything to commit IIRC.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org