You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2021/06/18 14:42:55 UTC

[qpid-dispatch] branch main updated: NO-JIRA: fix parallel priority test timeouts

This is an automated email from the ASF dual-hosted git repository.

kgiusti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/main by this push:
     new e7fe160  NO-JIRA: fix parallel priority test timeouts
e7fe160 is described below

commit e7fe1603e1667c7b05a76dd8ba86e912608522fd
Author: Kenneth Giusti <kg...@apache.org>
AuthorDate: Thu Jun 17 12:37:50 2021 -0400

    NO-JIRA: fix parallel priority test timeouts
    
    Update test clients to output debug logging.
    
    This closes #1265
---
 tests/system_tests_edge_router.py |  6 +++--
 tests/system_tests_link_routes.py |  8 ++++---
 tests/system_tests_router_mesh.py | 16 +++++++------
 tests/system_tests_two_routers.py |  6 +++--
 tests/test-receiver.c             | 25 +++++++++++++++++++-
 tests/test-sender.c               | 49 ++++++++++++++++++++++-----------------
 6 files changed, 74 insertions(+), 36 deletions(-)

diff --git a/tests/system_tests_edge_router.py b/tests/system_tests_edge_router.py
index fcb41f5..99b358f 100644
--- a/tests/system_tests_edge_router.py
+++ b/tests/system_tests_edge_router.py
@@ -3110,7 +3110,8 @@ class StreamingMessageTest(TestCase):
                "-i", "TestReceiver-%d" % self._container_index,
                "-a", router.listener,
                "-c", str(count),
-               "-s", address]
+               "-s", address,
+               "-d"]
         self._container_index += 1
         env = dict(os.environ, PN_TRACE_FRM="1")
         return self.popen(cmd, expect=expect, env=env)
@@ -3125,7 +3126,8 @@ class StreamingMessageTest(TestCase):
                "-a", router.listener,
                "-c", str(count),
                "-t", address,
-               size]
+               size,
+               "-d"]
         self._container_index += 1
         env = dict(os.environ, PN_TRACE_FRM="1")
         return self.popen(cmd, expect=expect, env=env)
diff --git a/tests/system_tests_link_routes.py b/tests/system_tests_link_routes.py
index 88c607e..81246e8 100644
--- a/tests/system_tests_link_routes.py
+++ b/tests/system_tests_link_routes.py
@@ -2653,7 +2653,7 @@ class LinkRoute3Hop(TestCase):
         same session.
         """
         send_clients = 10
-        send_batch = 10
+        send_batch = 5
         total = send_clients * send_batch
 
         fake_service = FakeService(self.QDR_A.addresses[1],
@@ -2665,7 +2665,8 @@ class LinkRoute3Hop(TestCase):
         rx = self.popen(["test-receiver",
                          "-a", self.QDR_C.addresses[0],
                          "-c", str(total),
-                         "-s", "closest/test-client"],
+                         "-s", "closest/test-client",
+                         "-d"],
                         env=env,
                         expect=Process.EXIT_OK)
 
@@ -2675,7 +2676,8 @@ class LinkRoute3Hop(TestCase):
                                "-c", str(send_batch),
                                "-i", "TestSender-%s" % x,
                                "-sx",   # huge message size to trigger Q2/Q3
-                               "-t", "closest/test-client"],
+                               "-t", "closest/test-client",
+                               "-d"],
                               env=env,
                               expect=Process.EXIT_OK)
 
diff --git a/tests/system_tests_router_mesh.py b/tests/system_tests_router_mesh.py
index 383b384..5b42b6a 100644
--- a/tests/system_tests_router_mesh.py
+++ b/tests/system_tests_router_mesh.py
@@ -205,8 +205,7 @@ class ThreeRouterTest(TestCase):
 
     def test_06_parallel_priority(self):
         """
-        Create 10 senders each with a different priority.  Send large messages
-        - large enough to trigger Qx flow control (sender argument "-sx").
+        Create 10 senders each with a different priority.
         Ensure all messages arrive as expected.
         """
         priorities = 10
@@ -214,23 +213,26 @@ class ThreeRouterTest(TestCase):
 
         total = priorities * send_batch
         rx = self.spawn_receiver(self.RouterC,
-                                 count=total,
-                                 address="closest/test_06_address")
+                                 total,
+                                 "closest/test_06_address",
+                                 "-d")
         self.RouterA.wait_address("closest/test_06_address")
 
         senders = [self.spawn_sender(self.RouterA,
                                      send_batch,
                                      "closest/test_06_address",
-                                     "-sx", "-p%s" % p)
+                                     "-sm", "-p%s" % p, "-d")
                    for p in range(priorities)]
 
-        if rx.wait(timeout=TIMEOUT):
-            raise Exception("Receiver failed to consume all messages")
+        # wait for all senders to finish first, then check the receiver
         for tx in senders:
             out_text, out_err = tx.communicate(timeout=TIMEOUT)
             if tx.returncode:
                 raise Exception("Sender failed: %s %s" % (out_text, out_err))
 
+        if rx.wait(timeout=TIMEOUT):
+            raise Exception("Receiver failed to consume all messages")
+
 
 if __name__ == '__main__':
     unittest.main(main_module())
diff --git a/tests/system_tests_two_routers.py b/tests/system_tests_two_routers.py
index 6e481a6..a287411 100644
--- a/tests/system_tests_two_routers.py
+++ b/tests/system_tests_two_routers.py
@@ -1998,7 +1998,8 @@ class StreamingLinkScrubberTest(TestCase):
         cmd = ["test-receiver",
                "-a", self.RouterB.listener,
                "-s", address,
-               "-c", str(sender_count)]
+               "-c", str(sender_count),
+               "-d"]
         rx = self.popen(cmd, env=env)
 
         self.RouterA.wait_address(address)
@@ -2011,7 +2012,8 @@ class StreamingLinkScrubberTest(TestCase):
                "-a", self.RouterA.listener,
                "-t", address,
                "-c", "1",
-               "-sx"
+               "-sx",
+               "-d"
                ]
         senders = [self.popen(cmd, env=env) for x in range(sender_count)]
 
diff --git a/tests/test-receiver.c b/tests/test-receiver.c
index 38820db..14f8c9b 100644
--- a/tests/test-receiver.c
+++ b/tests/test-receiver.c
@@ -32,9 +32,13 @@
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
+#include <time.h>
 #include <unistd.h>
 
+#define BOOL2STR(b) ((b)?"true":"false")
+
 bool stop = false;
+bool verbose = false;
 
 int  credit_window = 1000;
 char *source_address = "test-address";  // name of the source node to receive from
@@ -155,6 +159,7 @@ static void usage(void)
     printf("-s      \tSource address [%s]\n", source_address);
     printf("-w      \tCredit window [%d]\n", credit_window);
     printf("-E      \tExit without cleanly closing the connection [off]\n");
+    printf("-d      \tPrint periodic status updates [%s]\n", BOOL2STR(verbose));
     exit(1);
 }
 
@@ -169,7 +174,7 @@ int main(int argc, char** argv)
     /* command line options */
     opterr = 0;
     int c;
-    while((c = getopt(argc, argv, "i:a:s:hw:c:E")) != -1) {
+    while((c = getopt(argc, argv, "i:a:s:hdw:c:E")) != -1) {
         switch(c) {
         case 'h': usage(); break;
         case 'a': host_address = optarg; break;
@@ -184,6 +189,7 @@ int main(int argc, char** argv)
                 usage();
             break;
         case 'E': drop_connection = true;  break;
+        case 'd': verbose = true;          break;
 
         default:
             usage();
@@ -219,6 +225,7 @@ int main(int argc, char** argv)
 
     pn_reactor_start(reactor);
 
+    time_t last_log = time(NULL);
     while (pn_reactor_process(reactor)) {
         if (stop) {
             if (drop_connection)  // hard exit
@@ -228,6 +235,17 @@ int main(int argc, char** argv)
             if (pn_link) pn_link_close(pn_link);
             if (pn_ssn) pn_session_close(pn_ssn);
             if (pn_conn) pn_connection_close(pn_conn);
+
+        } else if (verbose) {
+
+            // periodically give status for test output logs
+
+            time_t now = time(NULL);
+            if ((now - last_log) >= 10) {
+                fprintf(stdout, "Received:%"PRIu64" of %"PRIu64"\n", count, limit);
+                fflush(stdout);
+                last_log = now;
+            }
         }
     }
 
@@ -237,5 +255,10 @@ int main(int argc, char** argv)
 
     pn_reactor_free(reactor);
 
+    if (verbose) {
+        fprintf(stdout, "Received:%"PRIu64" of %"PRIu64"\n", count, limit);
+        fflush(stdout);
+    }
+
     return 0;
 }
diff --git a/tests/test-sender.c b/tests/test-sender.c
index 322de00..57867b4 100644
--- a/tests/test-sender.c
+++ b/tests/test-sender.c
@@ -55,6 +55,7 @@
 //
 
 bool stop = false;
+bool verbose = false;
 
 uint64_t limit = 1;               // # messages to send
 uint64_t count = 0;               // # sent
@@ -357,6 +358,7 @@ static void usage(void)
   printf("-E      \tExit without cleanly closing the connection [off]\n");
   printf("-p      \tMessage priority [%d]\n", priority);
   printf("-X      \tMessage body data pattern [%c]\n", (char)body_data_pattern);
+  printf("-d      \tPrint periodic status updates [%s]\n", BOOL2STR(verbose));
   exit(1);
 }
 
@@ -365,7 +367,7 @@ int main(int argc, char** argv)
     /* command line options */
     opterr = 0;
     int c;
-    while ((c = getopt(argc, argv, "ha:c:i:ns:t:uMEp:X:")) != -1) {
+    while ((c = getopt(argc, argv, "ha:c:i:ns:t:udMEp:X:")) != -1) {
         switch(c) {
         case 'h': usage(); break;
         case 'a': host_address = optarg; break;
@@ -386,6 +388,7 @@ int main(int argc, char** argv)
             }
             break;
         case 't': target_address = optarg; break;
+        case 'd': verbose = true;          break;
         case 'u': presettle = true;        break;
         case 'M': add_annotations = true;  break;
         case 'E': drop_connection = true;  break;
@@ -432,7 +435,7 @@ int main(int argc, char** argv)
 
     pn_reactor_start(reactor);
 
-    time_t last_log = 0;
+    time_t last_log = time(NULL);
     while (pn_reactor_process(reactor)) {
         if (stop) {
             if (drop_connection) {  // hard stop
@@ -442,26 +445,22 @@ int main(int argc, char** argv)
                         count, accepted, rejected, released, modified);
                 exit(0);
             }
+            if (pn_link) pn_link_close(pn_link);
+            if (pn_ssn) pn_session_close(pn_ssn);
+            if (pn_conn) pn_connection_close(pn_conn);
 
-            // wait (forever) until all sent messages are confirmed by the
-            // receiver
-
-            if (count == acked) {
-                // close the endpoints this will cause pn_reactor_process() to
-                // eventually break the loop
-                if (pn_link) pn_link_close(pn_link);
-                if (pn_ssn) pn_session_close(pn_ssn);
-                if (pn_conn) pn_connection_close(pn_conn);
-            } else {
-                // periodically give status for test output logs
-                time_t now = time(NULL);
-                if ((now - last_log) >= 1) {
-                    fprintf(stdout,
-                            "Sent:%"PRIu64" Accepted:%"PRIu64" Rejected:%"PRIu64
-                            " Released:%"PRIu64" Modified:%"PRIu64"\n",
-                            count, accepted, rejected, released, modified);
-                    last_log = now;
-                }
+        } else if (verbose) {
+
+            // periodically give status for test output logs
+
+            time_t now = time(NULL);
+            if ((now - last_log) >= 10) {
+                fprintf(stdout,
+                        "Sent:%"PRIu64" Accepted:%"PRIu64" Rejected:%"PRIu64
+                        " Released:%"PRIu64" Modified:%"PRIu64" Limit:%"PRIu64"\n",
+                        count, accepted, rejected, released, modified, limit);
+                fflush(stdout);
+                last_log = now;
             }
         }
     }
@@ -472,5 +471,13 @@ int main(int argc, char** argv)
 
     pn_reactor_free(reactor);
 
+    if (verbose) {
+        fprintf(stdout,
+                "Sent:%"PRIu64" Accepted:%"PRIu64" Rejected:%"PRIu64
+                " Released:%"PRIu64" Modified:%"PRIu64" Limit:%"PRIu64"\n",
+                count, accepted, rejected, released, modified, limit);
+        fflush(stdout);
+    }
+
     return 0;
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org