You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2021/06/18 14:42:55 UTC
[qpid-dispatch] branch main updated: NO-JIRA: fix parallel priority
test timeouts
This is an automated email from the ASF dual-hosted git repository.
kgiusti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/main by this push:
new e7fe160 NO-JIRA: fix parallel priority test timeouts
e7fe160 is described below
commit e7fe1603e1667c7b05a76dd8ba86e912608522fd
Author: Kenneth Giusti <kg...@apache.org>
AuthorDate: Thu Jun 17 12:37:50 2021 -0400
NO-JIRA: fix parallel priority test timeouts
Update test clients to output debug logging.
This closes #1265
---
tests/system_tests_edge_router.py | 6 +++--
tests/system_tests_link_routes.py | 8 ++++---
tests/system_tests_router_mesh.py | 16 +++++++------
tests/system_tests_two_routers.py | 6 +++--
tests/test-receiver.c | 25 +++++++++++++++++++-
tests/test-sender.c | 49 ++++++++++++++++++++++-----------------
6 files changed, 74 insertions(+), 36 deletions(-)
diff --git a/tests/system_tests_edge_router.py b/tests/system_tests_edge_router.py
index fcb41f5..99b358f 100644
--- a/tests/system_tests_edge_router.py
+++ b/tests/system_tests_edge_router.py
@@ -3110,7 +3110,8 @@ class StreamingMessageTest(TestCase):
"-i", "TestReceiver-%d" % self._container_index,
"-a", router.listener,
"-c", str(count),
- "-s", address]
+ "-s", address,
+ "-d"]
self._container_index += 1
env = dict(os.environ, PN_TRACE_FRM="1")
return self.popen(cmd, expect=expect, env=env)
@@ -3125,7 +3126,8 @@ class StreamingMessageTest(TestCase):
"-a", router.listener,
"-c", str(count),
"-t", address,
- size]
+ size,
+ "-d"]
self._container_index += 1
env = dict(os.environ, PN_TRACE_FRM="1")
return self.popen(cmd, expect=expect, env=env)
diff --git a/tests/system_tests_link_routes.py b/tests/system_tests_link_routes.py
index 88c607e..81246e8 100644
--- a/tests/system_tests_link_routes.py
+++ b/tests/system_tests_link_routes.py
@@ -2653,7 +2653,7 @@ class LinkRoute3Hop(TestCase):
same session.
"""
send_clients = 10
- send_batch = 10
+ send_batch = 5
total = send_clients * send_batch
fake_service = FakeService(self.QDR_A.addresses[1],
@@ -2665,7 +2665,8 @@ class LinkRoute3Hop(TestCase):
rx = self.popen(["test-receiver",
"-a", self.QDR_C.addresses[0],
"-c", str(total),
- "-s", "closest/test-client"],
+ "-s", "closest/test-client",
+ "-d"],
env=env,
expect=Process.EXIT_OK)
@@ -2675,7 +2676,8 @@ class LinkRoute3Hop(TestCase):
"-c", str(send_batch),
"-i", "TestSender-%s" % x,
"-sx", # huge message size to trigger Q2/Q3
- "-t", "closest/test-client"],
+ "-t", "closest/test-client",
+ "-d"],
env=env,
expect=Process.EXIT_OK)
diff --git a/tests/system_tests_router_mesh.py b/tests/system_tests_router_mesh.py
index 383b384..5b42b6a 100644
--- a/tests/system_tests_router_mesh.py
+++ b/tests/system_tests_router_mesh.py
@@ -205,8 +205,7 @@ class ThreeRouterTest(TestCase):
def test_06_parallel_priority(self):
"""
- Create 10 senders each with a different priority. Send large messages
- - large enough to trigger Qx flow control (sender argument "-sx").
+ Create 10 senders each with a different priority.
Ensure all messages arrive as expected.
"""
priorities = 10
@@ -214,23 +213,26 @@ class ThreeRouterTest(TestCase):
total = priorities * send_batch
rx = self.spawn_receiver(self.RouterC,
- count=total,
- address="closest/test_06_address")
+ total,
+ "closest/test_06_address",
+ "-d")
self.RouterA.wait_address("closest/test_06_address")
senders = [self.spawn_sender(self.RouterA,
send_batch,
"closest/test_06_address",
- "-sx", "-p%s" % p)
+ "-sm", "-p%s" % p, "-d")
for p in range(priorities)]
- if rx.wait(timeout=TIMEOUT):
- raise Exception("Receiver failed to consume all messages")
+ # wait for all senders to finish first, then check the receiver
for tx in senders:
out_text, out_err = tx.communicate(timeout=TIMEOUT)
if tx.returncode:
raise Exception("Sender failed: %s %s" % (out_text, out_err))
+ if rx.wait(timeout=TIMEOUT):
+ raise Exception("Receiver failed to consume all messages")
+
if __name__ == '__main__':
unittest.main(main_module())
diff --git a/tests/system_tests_two_routers.py b/tests/system_tests_two_routers.py
index 6e481a6..a287411 100644
--- a/tests/system_tests_two_routers.py
+++ b/tests/system_tests_two_routers.py
@@ -1998,7 +1998,8 @@ class StreamingLinkScrubberTest(TestCase):
cmd = ["test-receiver",
"-a", self.RouterB.listener,
"-s", address,
- "-c", str(sender_count)]
+ "-c", str(sender_count),
+ "-d"]
rx = self.popen(cmd, env=env)
self.RouterA.wait_address(address)
@@ -2011,7 +2012,8 @@ class StreamingLinkScrubberTest(TestCase):
"-a", self.RouterA.listener,
"-t", address,
"-c", "1",
- "-sx"
+ "-sx",
+ "-d"
]
senders = [self.popen(cmd, env=env) for x in range(sender_count)]
diff --git a/tests/test-receiver.c b/tests/test-receiver.c
index 38820db..14f8c9b 100644
--- a/tests/test-receiver.c
+++ b/tests/test-receiver.c
@@ -32,9 +32,13 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
+#include <time.h>
#include <unistd.h>
+#define BOOL2STR(b) ((b)?"true":"false")
+
bool stop = false;
+bool verbose = false;
int credit_window = 1000;
char *source_address = "test-address"; // name of the source node to receive from
@@ -155,6 +159,7 @@ static void usage(void)
printf("-s \tSource address [%s]\n", source_address);
printf("-w \tCredit window [%d]\n", credit_window);
printf("-E \tExit without cleanly closing the connection [off]\n");
+ printf("-d \tPrint periodic status updates [%s]\n", BOOL2STR(verbose));
exit(1);
}
@@ -169,7 +174,7 @@ int main(int argc, char** argv)
/* command line options */
opterr = 0;
int c;
- while((c = getopt(argc, argv, "i:a:s:hw:c:E")) != -1) {
+ while((c = getopt(argc, argv, "i:a:s:hdw:c:E")) != -1) {
switch(c) {
case 'h': usage(); break;
case 'a': host_address = optarg; break;
@@ -184,6 +189,7 @@ int main(int argc, char** argv)
usage();
break;
case 'E': drop_connection = true; break;
+ case 'd': verbose = true; break;
default:
usage();
@@ -219,6 +225,7 @@ int main(int argc, char** argv)
pn_reactor_start(reactor);
+ time_t last_log = time(NULL);
while (pn_reactor_process(reactor)) {
if (stop) {
if (drop_connection) // hard exit
@@ -228,6 +235,17 @@ int main(int argc, char** argv)
if (pn_link) pn_link_close(pn_link);
if (pn_ssn) pn_session_close(pn_ssn);
if (pn_conn) pn_connection_close(pn_conn);
+
+ } else if (verbose) {
+
+ // periodically give status for test output logs
+
+ time_t now = time(NULL);
+ if ((now - last_log) >= 10) {
+ fprintf(stdout, "Received:%"PRIu64" of %"PRIu64"\n", count, limit);
+ fflush(stdout);
+ last_log = now;
+ }
}
}
@@ -237,5 +255,10 @@ int main(int argc, char** argv)
pn_reactor_free(reactor);
+ if (verbose) {
+ fprintf(stdout, "Received:%"PRIu64" of %"PRIu64"\n", count, limit);
+ fflush(stdout);
+ }
+
return 0;
}
diff --git a/tests/test-sender.c b/tests/test-sender.c
index 322de00..57867b4 100644
--- a/tests/test-sender.c
+++ b/tests/test-sender.c
@@ -55,6 +55,7 @@
//
bool stop = false;
+bool verbose = false;
uint64_t limit = 1; // # messages to send
uint64_t count = 0; // # sent
@@ -357,6 +358,7 @@ static void usage(void)
printf("-E \tExit without cleanly closing the connection [off]\n");
printf("-p \tMessage priority [%d]\n", priority);
printf("-X \tMessage body data pattern [%c]\n", (char)body_data_pattern);
+ printf("-d \tPrint periodic status updates [%s]\n", BOOL2STR(verbose));
exit(1);
}
@@ -365,7 +367,7 @@ int main(int argc, char** argv)
/* command line options */
opterr = 0;
int c;
- while ((c = getopt(argc, argv, "ha:c:i:ns:t:uMEp:X:")) != -1) {
+ while ((c = getopt(argc, argv, "ha:c:i:ns:t:udMEp:X:")) != -1) {
switch(c) {
case 'h': usage(); break;
case 'a': host_address = optarg; break;
@@ -386,6 +388,7 @@ int main(int argc, char** argv)
}
break;
case 't': target_address = optarg; break;
+ case 'd': verbose = true; break;
case 'u': presettle = true; break;
case 'M': add_annotations = true; break;
case 'E': drop_connection = true; break;
@@ -432,7 +435,7 @@ int main(int argc, char** argv)
pn_reactor_start(reactor);
- time_t last_log = 0;
+ time_t last_log = time(NULL);
while (pn_reactor_process(reactor)) {
if (stop) {
if (drop_connection) { // hard stop
@@ -442,26 +445,22 @@ int main(int argc, char** argv)
count, accepted, rejected, released, modified);
exit(0);
}
+ if (pn_link) pn_link_close(pn_link);
+ if (pn_ssn) pn_session_close(pn_ssn);
+ if (pn_conn) pn_connection_close(pn_conn);
- // wait (forever) until all sent messages are confirmed by the
- // receiver
-
- if (count == acked) {
- // close the endpoints this will cause pn_reactor_process() to
- // eventually break the loop
- if (pn_link) pn_link_close(pn_link);
- if (pn_ssn) pn_session_close(pn_ssn);
- if (pn_conn) pn_connection_close(pn_conn);
- } else {
- // periodically give status for test output logs
- time_t now = time(NULL);
- if ((now - last_log) >= 1) {
- fprintf(stdout,
- "Sent:%"PRIu64" Accepted:%"PRIu64" Rejected:%"PRIu64
- " Released:%"PRIu64" Modified:%"PRIu64"\n",
- count, accepted, rejected, released, modified);
- last_log = now;
- }
+ } else if (verbose) {
+
+ // periodically give status for test output logs
+
+ time_t now = time(NULL);
+ if ((now - last_log) >= 10) {
+ fprintf(stdout,
+ "Sent:%"PRIu64" Accepted:%"PRIu64" Rejected:%"PRIu64
+ " Released:%"PRIu64" Modified:%"PRIu64" Limit:%"PRIu64"\n",
+ count, accepted, rejected, released, modified, limit);
+ fflush(stdout);
+ last_log = now;
}
}
}
@@ -472,5 +471,13 @@ int main(int argc, char** argv)
pn_reactor_free(reactor);
+ if (verbose) {
+ fprintf(stdout,
+ "Sent:%"PRIu64" Accepted:%"PRIu64" Rejected:%"PRIu64
+ " Released:%"PRIu64" Modified:%"PRIu64" Limit:%"PRIu64"\n",
+ count, accepted, rejected, released, modified, limit);
+ fflush(stdout);
+ }
+
return 0;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org