You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/06/30 11:24:04 UTC

[GitHub] [kafka] cadonna opened a new pull request #10944: MINOR: Loose verification of startup in EOS system tests

cadonna opened a new pull request #10944:
URL: https://github.com/apache/kafka/pull/10944


   Currently, we verify that a Streams client transitioned
   from REBALANCING to RUNNING and that it processes some records
   in the EOS system test. However, if the Streams client only
   has standby tasks assigned, the client will never process
   records. Hence, the test will fail although everything is
   fine.
   
   This commit removes the verification that checks whether
   records are processed.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #10944: MINOR: Loose verification of startup in EOS system tests

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #10944:
URL: https://github.com/apache/kafka/pull/10944#discussion_r662160463



##########
File path: tests/kafkatest/tests/streams/streams_eos_test.py
##########
@@ -166,7 +166,6 @@ def abort_streams(self, keep_alive_processor1, keep_alive_processor2, processor_
 
     def wait_for_startup(self, monitor, processor):
         self.wait_for(monitor, processor, "StateChange: REBALANCING -> RUNNING")
-        self.wait_for(monitor, processor, "processed [0-9]* records from topic")

Review comment:
       According to the logs the test uses even 2 standby replicas (`num.standby.replicas = 2`). The git history says it was configured that way long time ago on June 8, 2017 by a guy named @mjsax.  🙂  
   
   And even if it wouldn't, there is still the possibility that a warm-up standby task is assigned to the client, before the actual active task is moved to it. Note `with followup probing rebalance` in the logs I posted. We could wait for settlement of the assignment, but that would extend the runtime of the test. I get your point about ensuring to not move to the next step too quickly, though. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna closed pull request #10944: MINOR: Loose verification of startup in EOS system tests

Posted by GitBox <gi...@apache.org>.
cadonna closed pull request #10944:
URL: https://github.com/apache/kafka/pull/10944


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10944: MINOR: Loose verification of startup in EOS system tests

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10944:
URL: https://github.com/apache/kafka/pull/10944#discussion_r661671240



##########
File path: tests/kafkatest/tests/streams/streams_eos_test.py
##########
@@ -166,7 +166,6 @@ def abort_streams(self, keep_alive_processor1, keep_alive_processor2, processor_
 
     def wait_for_startup(self, monitor, processor):
         self.wait_for(monitor, processor, "StateChange: REBALANCING -> RUNNING")
-        self.wait_for(monitor, processor, "processed [0-9]* records from topic")

Review comment:
       Does this test even use standbys?
   
   IIRC, the idea was to ensure that the instance really started to process data, so we don't move to the next step, too quickly in the test.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna edited a comment on pull request #10944: MINOR: Loose verification of startup in EOS system tests

Posted by GitBox <gi...@apache.org>.
cadonna edited a comment on pull request #10944:
URL: https://github.com/apache/kafka/pull/10944#issuecomment-871339360


   From the logs, I see the following last assignment:
   ```
   2021-06-29 09:57:45,026] INFO Decided on assignment: {
   
   016c5d71-b8a4-4e8a-9896-e757eefb68c6=[
       activeTasks: ([0_0, 0_2, 0_3]) 
       standbyTasks: ([0_1, 0_4]) 
       prevActiveTasks: ([0_0, 0_2]) 
       prevStandbyTasks: ([0_1, 0_3, 0_4]) 
       changelogOffsetTotalsByTask: ([0_0=-2, 0_1=10880, 0_2=-2, 0_3=10428, 0_4=10964]) 
       taskLagTotals: ([0_0=-2, 0_1=280, 0_2=-2, 0_3=24, 0_4=4]) 
       capacity: 1 
       assigned: 5], 
   
   3134dbd9-4def-4574-b027-427ae654d8f5=[
       activeTasks: ([0_1, 0_4]) 
       standbyTasks: ([0_0, 0_2, 0_3]) 
       prevActiveTasks: ([0_1]) 
       prevStandbyTasks: ([0_0, 0_2, 0_3, 0_4]) 
       changelogOffsetTotalsByTask: ([0_0=10226, 0_1=-2, 0_2=6254, 0_3=10448, 0_4=10680]) 
       taskLagTotals: ([0_0=580, 0_1=-2, 0_2=4998, 0_3=4, 0_4=288]) 
       capacity: 1 
       assigned: 5], 
   
   5c2faa18-0876-498a-b00d-9b5bd62297d1=[
       activeTasks: ([]) 
       standbyTasks: ([0_0, 0_1, 0_2, 0_3, 0_4]) 
       prevActiveTasks: ([]) 
       prevStandbyTasks: ([0_0, 0_1, 0_2, 0_3, 0_4]) 
       changelogOffsetTotalsByTask: ([0_0=0, 0_1=0, 0_2=0, 0_3=0, 0_4=2009]) 
       taskLagTotals: ([0_0=10806, 0_1=11160, 0_2=11252, 0_3=10452, 0_4=8959]) 
       capacity: 1 
       assigned: 5]
   } with followup probing rebalance. (org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor)
   ```
   `5c2faa18-0876-498a-b00d-9b5bd62297d1` is the Streams client that is shutdown uncleanly and verified when it restarts. The verification whether `5c2faa18-0876-498a-b00d-9b5bd62297d1` processed records will never be satisfied since the Streams client only has standby tasks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #10944: MINOR: Loose verification of startup in EOS system tests

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #10944:
URL: https://github.com/apache/kafka/pull/10944#issuecomment-883418844


   Opened #11090 with an alternative approach to fix the system test.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna edited a comment on pull request #10944: MINOR: Loose verification of startup in EOS system tests

Posted by GitBox <gi...@apache.org>.
cadonna edited a comment on pull request #10944:
URL: https://github.com/apache/kafka/pull/10944#issuecomment-871339360


   From the logs, I see the following last assignment before the verification:
   ```
   2021-06-29 09:57:45,026] INFO Decided on assignment: {
   
   016c5d71-b8a4-4e8a-9896-e757eefb68c6=[
       activeTasks: ([0_0, 0_2, 0_3]) 
       standbyTasks: ([0_1, 0_4]) 
       prevActiveTasks: ([0_0, 0_2]) 
       prevStandbyTasks: ([0_1, 0_3, 0_4]) 
       changelogOffsetTotalsByTask: ([0_0=-2, 0_1=10880, 0_2=-2, 0_3=10428, 0_4=10964]) 
       taskLagTotals: ([0_0=-2, 0_1=280, 0_2=-2, 0_3=24, 0_4=4]) 
       capacity: 1 
       assigned: 5], 
   
   3134dbd9-4def-4574-b027-427ae654d8f5=[
       activeTasks: ([0_1, 0_4]) 
       standbyTasks: ([0_0, 0_2, 0_3]) 
       prevActiveTasks: ([0_1]) 
       prevStandbyTasks: ([0_0, 0_2, 0_3, 0_4]) 
       changelogOffsetTotalsByTask: ([0_0=10226, 0_1=-2, 0_2=6254, 0_3=10448, 0_4=10680]) 
       taskLagTotals: ([0_0=580, 0_1=-2, 0_2=4998, 0_3=4, 0_4=288]) 
       capacity: 1 
       assigned: 5], 
   
   5c2faa18-0876-498a-b00d-9b5bd62297d1=[
       activeTasks: ([]) 
       standbyTasks: ([0_0, 0_1, 0_2, 0_3, 0_4]) 
       prevActiveTasks: ([]) 
       prevStandbyTasks: ([0_0, 0_1, 0_2, 0_3, 0_4]) 
       changelogOffsetTotalsByTask: ([0_0=0, 0_1=0, 0_2=0, 0_3=0, 0_4=2009]) 
       taskLagTotals: ([0_0=10806, 0_1=11160, 0_2=11252, 0_3=10452, 0_4=8959]) 
       capacity: 1 
       assigned: 5]
   } with followup probing rebalance. (org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor)
   ```
   `5c2faa18-0876-498a-b00d-9b5bd62297d1` is the Streams client that is shutdown uncleanly and verified when it restarts. The verification whether `5c2faa18-0876-498a-b00d-9b5bd62297d1` processed records will never be satisfied since the Streams client only has standby tasks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #10944: MINOR: Loose verification of startup in EOS system tests

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #10944:
URL: https://github.com/apache/kafka/pull/10944#issuecomment-884015976


   PR #11090 got merged. I will close this for now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #10944: MINOR: Loose verification of startup in EOS system tests

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #10944:
URL: https://github.com/apache/kafka/pull/10944#discussion_r663088560



##########
File path: tests/kafkatest/tests/streams/streams_eos_test.py
##########
@@ -128,45 +128,61 @@ def run_failure_and_recovery(self, processor1, processor2, processor3, verifier)
         verifier.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % verifier.STDOUT_FILE, allow_fail=False)
 
     def add_streams(self, processor):
-        with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor:
-            processor.start()
-            self.wait_for_startup(monitor, processor)
+        with processor.node.account.monitor_log(processor.LOG_FILE) as log_monitor:
+            with processor.node.account.monitor_log(processor.STDOUT_FILE) as stdout_monitor:
+                processor.start()
+                self.wait_for_running(stdout_monitor, processor)
+                self.wait_for_commit(log_monitor, processor)
 
     def add_streams2(self, running_processor, processor_to_be_started):
-        with running_processor.node.account.monitor_log(running_processor.STDOUT_FILE) as monitor:
-            self.add_streams(processor_to_be_started)
-            self.wait_for_startup(monitor, running_processor)
+        with running_processor.node.account.monitor_log(running_processor.LOG_FILE) as log_monitor:
+            with running_processor.node.account.monitor_log(running_processor.STDOUT_FILE) as stdout_monitor:
+                self.add_streams(processor_to_be_started)
+                self.wait_for_running(stdout_monitor, running_processor)
+                self.wait_for_commit(log_monitor, running_processor)
 
     def add_streams3(self, running_processor1, running_processor2, processor_to_be_started):
-        with running_processor1.node.account.monitor_log(running_processor1.STDOUT_FILE) as monitor:
-            self.add_streams2(running_processor2, processor_to_be_started)
-            self.wait_for_startup(monitor, running_processor1)
+        with running_processor1.node.account.monitor_log(running_processor1.LOG_FILE) as log_monitor:
+            with running_processor1.node.account.monitor_log(running_processor1.STDOUT_FILE) as stdout_monitor:
+                self.add_streams2(running_processor2, processor_to_be_started)
+                self.wait_for_running(stdout_monitor, running_processor1)
+                self.wait_for_commit(log_monitor, running_processor1)
 
     def stop_streams(self, processor_to_be_stopped):
         with processor_to_be_stopped.node.account.monitor_log(processor_to_be_stopped.STDOUT_FILE) as monitor2:
             processor_to_be_stopped.stop()
             self.wait_for(monitor2, processor_to_be_stopped, "StateChange: PENDING_SHUTDOWN -> NOT_RUNNING")
 
     def stop_streams2(self, keep_alive_processor, processor_to_be_stopped):
-        with keep_alive_processor.node.account.monitor_log(keep_alive_processor.STDOUT_FILE) as monitor:
-            self.stop_streams(processor_to_be_stopped)
-            self.wait_for_startup(monitor, keep_alive_processor)
+        with keep_alive_processor.node.account.monitor_log(keep_alive_processor.LOG_FILE) as log_monitor:
+            with keep_alive_processor.node.account.monitor_log(keep_alive_processor.STDOUT_FILE) as stdout_monitor:
+                self.stop_streams(processor_to_be_stopped)
+                self.wait_for_running(stdout_monitor, keep_alive_processor)
+                self.wait_for_commit(log_monitor, keep_alive_processor)
 
     def stop_streams3(self, keep_alive_processor1, keep_alive_processor2, processor_to_be_stopped):
-        with keep_alive_processor1.node.account.monitor_log(keep_alive_processor1.STDOUT_FILE) as monitor:
-            self.stop_streams2(keep_alive_processor2, processor_to_be_stopped)
-            self.wait_for_startup(monitor, keep_alive_processor1)
+        with keep_alive_processor1.node.account.monitor_log(keep_alive_processor1.LOG_FILE) as log_monitor:
+            with keep_alive_processor1.node.account.monitor_log(keep_alive_processor1.STDOUT_FILE) as stdout_monitor:
+                self.stop_streams2(keep_alive_processor2, processor_to_be_stopped)
+                self.wait_for_running(stdout_monitor, keep_alive_processor1)
+                self.wait_for_commit(log_monitor, keep_alive_processor1)
 
     def abort_streams(self, keep_alive_processor1, keep_alive_processor2, processor_to_be_aborted):
-        with keep_alive_processor1.node.account.monitor_log(keep_alive_processor1.STDOUT_FILE) as monitor1:
-            with keep_alive_processor2.node.account.monitor_log(keep_alive_processor2.STDOUT_FILE) as monitor2:
-                processor_to_be_aborted.stop_nodes(False)
-            self.wait_for_startup(monitor2, keep_alive_processor2)
-        self.wait_for_startup(monitor1, keep_alive_processor1)
-
-    def wait_for_startup(self, monitor, processor):
+        with keep_alive_processor1.node.account.monitor_log(keep_alive_processor1.LOG_FILE) as log_monitor1:
+            with keep_alive_processor1.node.account.monitor_log(keep_alive_processor1.STDOUT_FILE) as stdout_monitor1:
+                with keep_alive_processor2.node.account.monitor_log(keep_alive_processor1.LOG_FILE) as log_monitor2:
+                    with keep_alive_processor2.node.account.monitor_log(keep_alive_processor1.STDOUT_FILE) as stdout_monitor2:
+                        processor_to_be_aborted.stop_nodes(False)
+                        self.wait_for_running(stdout_monitor2, keep_alive_processor2)
+                        self.wait_for_running(stdout_monitor1, keep_alive_processor1)
+                        self.wait_for_commit(log_monitor2, keep_alive_processor2)
+                        self.wait_for_commit(log_monitor1, keep_alive_processor1)
+
+    def wait_for_running(self, monitor, processor):
         self.wait_for(monitor, processor, "StateChange: REBALANCING -> RUNNING")
-        self.wait_for(monitor, processor, "processed [0-9]* records from topic")
+
+    def wait_for_commit(self, monitor, processor):
+        self.wait_for(monitor, processor, "Committed all active tasks \[[0-9_,]+\] and standby tasks \[[0-9_,]+\]")

Review comment:
       @mjsax I added this verification to ensure that the instance started to process data. It is a bit more complicate and  a bit more brittle because it relies on DEBUG logs, but it works for active and standby tasks. WDYT?
   If you think it is OK, I will start testing the system test locally.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #10944: MINOR: Loose verification of startup in EOS system tests

Posted by GitBox <gi...@apache.org>.
cadonna commented on pull request #10944:
URL: https://github.com/apache/kafka/pull/10944#issuecomment-871339360


   From the logs, I see the following assignment:
   ```
   2021-06-29 09:57:45,026] INFO Decided on assignment: {
   
   016c5d71-b8a4-4e8a-9896-e757eefb68c6=[
       activeTasks: ([0_0, 0_2, 0_3]) 
       standbyTasks: ([0_1, 0_4]) 
       prevActiveTasks: ([0_0, 0_2]) 
       prevStandbyTasks: ([0_1, 0_3, 0_4]) 
       changelogOffsetTotalsByTask: ([0_0=-2, 0_1=10880, 0_2=-2, 0_3=10428, 0_4=10964]) 
       taskLagTotals: ([0_0=-2, 0_1=280, 0_2=-2, 0_3=24, 0_4=4]) 
       capacity: 1 
       assigned: 5], 
   
   3134dbd9-4def-4574-b027-427ae654d8f5=[
       activeTasks: ([0_1, 0_4]) 
       standbyTasks: ([0_0, 0_2, 0_3]) 
       prevActiveTasks: ([0_1]) 
       prevStandbyTasks: ([0_0, 0_2, 0_3, 0_4]) 
       changelogOffsetTotalsByTask: ([0_0=10226, 0_1=-2, 0_2=6254, 0_3=10448, 0_4=10680]) 
       taskLagTotals: ([0_0=580, 0_1=-2, 0_2=4998, 0_3=4, 0_4=288]) 
       capacity: 1 
       assigned: 5], 
   
   5c2faa18-0876-498a-b00d-9b5bd62297d1=[
       activeTasks: ([]) 
       standbyTasks: ([0_0, 0_1, 0_2, 0_3, 0_4]) 
       prevActiveTasks: ([]) 
       prevStandbyTasks: ([0_0, 0_1, 0_2, 0_3, 0_4]) 
       changelogOffsetTotalsByTask: ([0_0=0, 0_1=0, 0_2=0, 0_3=0, 0_4=2009]) 
       taskLagTotals: ([0_0=10806, 0_1=11160, 0_2=11252, 0_3=10452, 0_4=8959]) 
       capacity: 1 
       assigned: 5]
   } with followup probing rebalance. (org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor)
   ```
   `5c2faa18-0876-498a-b00d-9b5bd62297d1` is the Streams client that is shutdown uncleanly and verified when it restarts. The verification whether `5c2faa18-0876-498a-b00d-9b5bd62297d1` processed records will never be satisfied since the Streams client only has standby tasks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #10944: MINOR: Loose verification of startup in EOS system tests

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #10944:
URL: https://github.com/apache/kafka/pull/10944#discussion_r668643843



##########
File path: tests/kafkatest/tests/streams/streams_eos_test.py
##########
@@ -128,45 +128,61 @@ def run_failure_and_recovery(self, processor1, processor2, processor3, verifier)
         verifier.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % verifier.STDOUT_FILE, allow_fail=False)
 
     def add_streams(self, processor):
-        with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor:
-            processor.start()
-            self.wait_for_startup(monitor, processor)
+        with processor.node.account.monitor_log(processor.LOG_FILE) as log_monitor:
+            with processor.node.account.monitor_log(processor.STDOUT_FILE) as stdout_monitor:
+                processor.start()
+                self.wait_for_running(stdout_monitor, processor)
+                self.wait_for_commit(log_monitor, processor)
 
     def add_streams2(self, running_processor, processor_to_be_started):
-        with running_processor.node.account.monitor_log(running_processor.STDOUT_FILE) as monitor:
-            self.add_streams(processor_to_be_started)
-            self.wait_for_startup(monitor, running_processor)
+        with running_processor.node.account.monitor_log(running_processor.LOG_FILE) as log_monitor:
+            with running_processor.node.account.monitor_log(running_processor.STDOUT_FILE) as stdout_monitor:
+                self.add_streams(processor_to_be_started)
+                self.wait_for_running(stdout_monitor, running_processor)
+                self.wait_for_commit(log_monitor, running_processor)
 
     def add_streams3(self, running_processor1, running_processor2, processor_to_be_started):
-        with running_processor1.node.account.monitor_log(running_processor1.STDOUT_FILE) as monitor:
-            self.add_streams2(running_processor2, processor_to_be_started)
-            self.wait_for_startup(monitor, running_processor1)
+        with running_processor1.node.account.monitor_log(running_processor1.LOG_FILE) as log_monitor:
+            with running_processor1.node.account.monitor_log(running_processor1.STDOUT_FILE) as stdout_monitor:
+                self.add_streams2(running_processor2, processor_to_be_started)
+                self.wait_for_running(stdout_monitor, running_processor1)
+                self.wait_for_commit(log_monitor, running_processor1)
 
     def stop_streams(self, processor_to_be_stopped):
         with processor_to_be_stopped.node.account.monitor_log(processor_to_be_stopped.STDOUT_FILE) as monitor2:
             processor_to_be_stopped.stop()
             self.wait_for(monitor2, processor_to_be_stopped, "StateChange: PENDING_SHUTDOWN -> NOT_RUNNING")
 
     def stop_streams2(self, keep_alive_processor, processor_to_be_stopped):
-        with keep_alive_processor.node.account.monitor_log(keep_alive_processor.STDOUT_FILE) as monitor:
-            self.stop_streams(processor_to_be_stopped)
-            self.wait_for_startup(monitor, keep_alive_processor)
+        with keep_alive_processor.node.account.monitor_log(keep_alive_processor.LOG_FILE) as log_monitor:
+            with keep_alive_processor.node.account.monitor_log(keep_alive_processor.STDOUT_FILE) as stdout_monitor:
+                self.stop_streams(processor_to_be_stopped)
+                self.wait_for_running(stdout_monitor, keep_alive_processor)
+                self.wait_for_commit(log_monitor, keep_alive_processor)
 
     def stop_streams3(self, keep_alive_processor1, keep_alive_processor2, processor_to_be_stopped):
-        with keep_alive_processor1.node.account.monitor_log(keep_alive_processor1.STDOUT_FILE) as monitor:
-            self.stop_streams2(keep_alive_processor2, processor_to_be_stopped)
-            self.wait_for_startup(monitor, keep_alive_processor1)
+        with keep_alive_processor1.node.account.monitor_log(keep_alive_processor1.LOG_FILE) as log_monitor:
+            with keep_alive_processor1.node.account.monitor_log(keep_alive_processor1.STDOUT_FILE) as stdout_monitor:
+                self.stop_streams2(keep_alive_processor2, processor_to_be_stopped)
+                self.wait_for_running(stdout_monitor, keep_alive_processor1)
+                self.wait_for_commit(log_monitor, keep_alive_processor1)
 
     def abort_streams(self, keep_alive_processor1, keep_alive_processor2, processor_to_be_aborted):
-        with keep_alive_processor1.node.account.monitor_log(keep_alive_processor1.STDOUT_FILE) as monitor1:
-            with keep_alive_processor2.node.account.monitor_log(keep_alive_processor2.STDOUT_FILE) as monitor2:
-                processor_to_be_aborted.stop_nodes(False)
-            self.wait_for_startup(monitor2, keep_alive_processor2)
-        self.wait_for_startup(monitor1, keep_alive_processor1)
-
-    def wait_for_startup(self, monitor, processor):
+        with keep_alive_processor1.node.account.monitor_log(keep_alive_processor1.LOG_FILE) as log_monitor1:
+            with keep_alive_processor1.node.account.monitor_log(keep_alive_processor1.STDOUT_FILE) as stdout_monitor1:
+                with keep_alive_processor2.node.account.monitor_log(keep_alive_processor1.LOG_FILE) as log_monitor2:
+                    with keep_alive_processor2.node.account.monitor_log(keep_alive_processor1.STDOUT_FILE) as stdout_monitor2:
+                        processor_to_be_aborted.stop_nodes(False)
+                        self.wait_for_running(stdout_monitor2, keep_alive_processor2)
+                        self.wait_for_running(stdout_monitor1, keep_alive_processor1)
+                        self.wait_for_commit(log_monitor2, keep_alive_processor2)
+                        self.wait_for_commit(log_monitor1, keep_alive_processor1)
+
+    def wait_for_running(self, monitor, processor):
         self.wait_for(monitor, processor, "StateChange: REBALANCING -> RUNNING")
-        self.wait_for(monitor, processor, "processed [0-9]* records from topic")
+
+    def wait_for_commit(self, monitor, processor):
+        self.wait_for(monitor, processor, "Committed all active tasks \[[0-9_,]+\] and standby tasks \[[0-9_,]+\]")

Review comment:
       Assuming we always have something to commit seems indeed a bit brittle. Let me think about an alternative.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10944: MINOR: Loose verification of startup in EOS system tests

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10944:
URL: https://github.com/apache/kafka/pull/10944#discussion_r668337811



##########
File path: tests/kafkatest/tests/streams/streams_eos_test.py
##########
@@ -128,45 +128,61 @@ def run_failure_and_recovery(self, processor1, processor2, processor3, verifier)
         verifier.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % verifier.STDOUT_FILE, allow_fail=False)
 
     def add_streams(self, processor):
-        with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor:
-            processor.start()
-            self.wait_for_startup(monitor, processor)
+        with processor.node.account.monitor_log(processor.LOG_FILE) as log_monitor:
+            with processor.node.account.monitor_log(processor.STDOUT_FILE) as stdout_monitor:
+                processor.start()
+                self.wait_for_running(stdout_monitor, processor)
+                self.wait_for_commit(log_monitor, processor)
 
     def add_streams2(self, running_processor, processor_to_be_started):
-        with running_processor.node.account.monitor_log(running_processor.STDOUT_FILE) as monitor:
-            self.add_streams(processor_to_be_started)
-            self.wait_for_startup(monitor, running_processor)
+        with running_processor.node.account.monitor_log(running_processor.LOG_FILE) as log_monitor:
+            with running_processor.node.account.monitor_log(running_processor.STDOUT_FILE) as stdout_monitor:
+                self.add_streams(processor_to_be_started)
+                self.wait_for_running(stdout_monitor, running_processor)
+                self.wait_for_commit(log_monitor, running_processor)
 
     def add_streams3(self, running_processor1, running_processor2, processor_to_be_started):
-        with running_processor1.node.account.monitor_log(running_processor1.STDOUT_FILE) as monitor:
-            self.add_streams2(running_processor2, processor_to_be_started)
-            self.wait_for_startup(monitor, running_processor1)
+        with running_processor1.node.account.monitor_log(running_processor1.LOG_FILE) as log_monitor:
+            with running_processor1.node.account.monitor_log(running_processor1.STDOUT_FILE) as stdout_monitor:
+                self.add_streams2(running_processor2, processor_to_be_started)
+                self.wait_for_running(stdout_monitor, running_processor1)
+                self.wait_for_commit(log_monitor, running_processor1)
 
     def stop_streams(self, processor_to_be_stopped):
         with processor_to_be_stopped.node.account.monitor_log(processor_to_be_stopped.STDOUT_FILE) as monitor2:
             processor_to_be_stopped.stop()
             self.wait_for(monitor2, processor_to_be_stopped, "StateChange: PENDING_SHUTDOWN -> NOT_RUNNING")
 
     def stop_streams2(self, keep_alive_processor, processor_to_be_stopped):
-        with keep_alive_processor.node.account.monitor_log(keep_alive_processor.STDOUT_FILE) as monitor:
-            self.stop_streams(processor_to_be_stopped)
-            self.wait_for_startup(monitor, keep_alive_processor)
+        with keep_alive_processor.node.account.monitor_log(keep_alive_processor.LOG_FILE) as log_monitor:
+            with keep_alive_processor.node.account.monitor_log(keep_alive_processor.STDOUT_FILE) as stdout_monitor:
+                self.stop_streams(processor_to_be_stopped)
+                self.wait_for_running(stdout_monitor, keep_alive_processor)
+                self.wait_for_commit(log_monitor, keep_alive_processor)
 
     def stop_streams3(self, keep_alive_processor1, keep_alive_processor2, processor_to_be_stopped):
-        with keep_alive_processor1.node.account.monitor_log(keep_alive_processor1.STDOUT_FILE) as monitor:
-            self.stop_streams2(keep_alive_processor2, processor_to_be_stopped)
-            self.wait_for_startup(monitor, keep_alive_processor1)
+        with keep_alive_processor1.node.account.monitor_log(keep_alive_processor1.LOG_FILE) as log_monitor:
+            with keep_alive_processor1.node.account.monitor_log(keep_alive_processor1.STDOUT_FILE) as stdout_monitor:
+                self.stop_streams2(keep_alive_processor2, processor_to_be_stopped)
+                self.wait_for_running(stdout_monitor, keep_alive_processor1)
+                self.wait_for_commit(log_monitor, keep_alive_processor1)
 
     def abort_streams(self, keep_alive_processor1, keep_alive_processor2, processor_to_be_aborted):
-        with keep_alive_processor1.node.account.monitor_log(keep_alive_processor1.STDOUT_FILE) as monitor1:
-            with keep_alive_processor2.node.account.monitor_log(keep_alive_processor2.STDOUT_FILE) as monitor2:
-                processor_to_be_aborted.stop_nodes(False)
-            self.wait_for_startup(monitor2, keep_alive_processor2)
-        self.wait_for_startup(monitor1, keep_alive_processor1)
-
-    def wait_for_startup(self, monitor, processor):
+        with keep_alive_processor1.node.account.monitor_log(keep_alive_processor1.LOG_FILE) as log_monitor1:
+            with keep_alive_processor1.node.account.monitor_log(keep_alive_processor1.STDOUT_FILE) as stdout_monitor1:
+                with keep_alive_processor2.node.account.monitor_log(keep_alive_processor1.LOG_FILE) as log_monitor2:
+                    with keep_alive_processor2.node.account.monitor_log(keep_alive_processor1.STDOUT_FILE) as stdout_monitor2:
+                        processor_to_be_aborted.stop_nodes(False)
+                        self.wait_for_running(stdout_monitor2, keep_alive_processor2)
+                        self.wait_for_running(stdout_monitor1, keep_alive_processor1)
+                        self.wait_for_commit(log_monitor2, keep_alive_processor2)
+                        self.wait_for_commit(log_monitor1, keep_alive_processor1)
+
+    def wait_for_running(self, monitor, processor):
         self.wait_for(monitor, processor, "StateChange: REBALANCING -> RUNNING")
-        self.wait_for(monitor, processor, "processed [0-9]* records from topic")
+
+    def wait_for_commit(self, monitor, processor):
+        self.wait_for(monitor, processor, "Committed all active tasks \[[0-9_,]+\] and standby tasks \[[0-9_,]+\]")

Review comment:
       Is this also logged if there is nothing to be committed? Just double checking to ensure we don't introduce a new issue -- we only commit, if we have anything to commit IIRC.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #10944: MINOR: Loose verification of startup in EOS system tests

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #10944:
URL: https://github.com/apache/kafka/pull/10944#discussion_r661370950



##########
File path: tests/kafkatest/tests/streams/streams_eos_test.py
##########
@@ -166,7 +166,6 @@ def abort_streams(self, keep_alive_processor1, keep_alive_processor2, processor_
 
     def wait_for_startup(self, monitor, processor):
         self.wait_for(monitor, processor, "StateChange: REBALANCING -> RUNNING")
-        self.wait_for(monitor, processor, "processed [0-9]* records from topic")

Review comment:
       Is this verification strictly needed? If the Streams client has only standby tasks assigned, it will not process records but it will be in state RUNNING. A Streams client that has only standby tasks assigned seems a valid situation to me that should not make the test fail.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org