You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2020/10/13 15:21:40 UTC

[qpid-dispatch] branch dev-protocol-adaptors updated: DISPATCH-1744: add multi-hop large msg tests

This is an automated email from the ASF dual-hosted git repository.

kgiusti pushed a commit to branch dev-protocol-adaptors
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/dev-protocol-adaptors by this push:
     new ae6d2c2  DISPATCH-1744: add multi-hop large msg tests
ae6d2c2 is described below

commit ae6d2c29101a4feb860ac24ffc10837f143700b1
Author: Kenneth Giusti <kg...@apache.org>
AuthorDate: Wed Oct 7 09:50:53 2020 -0400

    DISPATCH-1744: add multi-hop large msg tests
---
 src/adaptors/http1/http1_adaptor.c  |   7 +-
 src/adaptors/http1/http1_codec.c    |   3 +-
 src/adaptors/http1/http1_private.h  |   3 +
 src/adaptors/http1/http1_server.c   |  17 +-
 tests/system_tests_http1_adaptor.py | 300 ++++++++++++++++++++----------------
 5 files changed, 179 insertions(+), 151 deletions(-)

diff --git a/src/adaptors/http1/http1_adaptor.c b/src/adaptors/http1/http1_adaptor.c
index 48e9326..4663385 100644
--- a/src/adaptors/http1/http1_adaptor.c
+++ b/src/adaptors/http1/http1_adaptor.c
@@ -30,9 +30,6 @@
 // to HTTP endpoint processing.
 //
 
-// for debug: will dump raw buffer content to stdout if true
-#define HTTP1_DUMP_BUFFERS false
-
 #define RAW_BUFFER_BATCH  16
 
 
@@ -384,8 +381,8 @@ void qdr_http1_free_written_buffers(qdr_http1_connection_t *hconn)
             if (HTTP1_DUMP_BUFFERS) {
                 char *ptr = (char*) buffers[i].bytes;
                 int len = (int) buffers[i].size;
-                fprintf(stdout, "\n[C%"PRIu64"] Raw Written: Ptr=%p len=%d\n  value='%.*s'\n",
-                        hconn->conn_id, (void*)ptr, len, len, ptr);
+                fprintf(stdout, "\n[C%"PRIu64"] Raw Written: Ptr=%p len=%d c=%d o=%d\n  value='%.*s'\n",
+                        hconn->conn_id, (void*)ptr, len, buffers[i].capacity, buffers[i].offset, len, ptr);
                 fflush(stdout);
             }
 
diff --git a/src/adaptors/http1/http1_codec.c b/src/adaptors/http1/http1_codec.c
index d1fbea9..7b63937 100644
--- a/src/adaptors/http1/http1_codec.c
+++ b/src/adaptors/http1/http1_codec.c
@@ -1087,10 +1087,11 @@ static bool parse_body_chunked_data(h1_codec_connection_t *conn, struct decoder_
     decoder->chunk_length -= skipped;
     body_ptr->remaining += skipped;
 
+    consume_body_data(conn, false);
+
     if (decoder->chunk_length == 0) {
         // end of chunk
         decoder->chunk_state = HTTP1_CHUNK_HEADER;
-        consume_body_data(conn, false);
     }
 
     return !!rptr->remaining;
diff --git a/src/adaptors/http1/http1_private.h b/src/adaptors/http1/http1_private.h
index d82e421..8e29ae2 100644
--- a/src/adaptors/http1/http1_private.h
+++ b/src/adaptors/http1/http1_private.h
@@ -34,6 +34,9 @@
 #include <qpid/dispatch/message.h>
 #include "adaptors/http_common.h"
 
+// for debug: will dump I/O buffer content to stdout if true
+#define HTTP1_DUMP_BUFFERS false
+
 typedef struct qdr_http1_out_data_t      qdr_http1_out_data_t;
 typedef struct qdr_http1_out_data_fifo_t qdr_http1_out_data_fifo_t;
 typedef struct qdr_http1_request_base_t  qdr_http1_request_base_t;
diff --git a/src/adaptors/http1/http1_server.c b/src/adaptors/http1/http1_server.c
index 2d11600..69df490 100644
--- a/src/adaptors/http1/http1_server.c
+++ b/src/adaptors/http1/http1_server.c
@@ -27,9 +27,6 @@
 // connection is terminated at an HTTP server, not an HTTP client.
 //
 
-// for debug: dump raw buffers to stdout if true
-#define HTTP1_DUMP_BUFFERS false
-
 
 //
 // State for a single response message arriving via the raw connection.  This
@@ -1107,6 +1104,7 @@ static uint64_t _send_request_headers(_server_request_t *hreq, qd_message_t *msg
     uint64_t outcome = 0;
 
     assert(!hreq->base.lib_rs);
+    assert(qd_message_check_depth(msg, QD_DEPTH_PROPERTIES) == QD_MESSAGE_DEPTH_OK);
 
     // method is passed in the SUBJECT field
     qd_iterator_t *method_iter = qd_message_field_iterator(msg, QD_FIELD_SUBJECT);
@@ -1220,19 +1218,6 @@ static uint64_t _encode_request_message(_server_request_t *hreq)
 {
     qdr_http1_connection_t    *hconn = hreq->base.hconn;
     qd_message_t                *msg = qdr_delivery_message(hreq->request_dlv);
-    qd_message_depth_status_t status = qd_message_check_depth(msg, QD_DEPTH_BODY);
-
-    if (status == QD_MESSAGE_DEPTH_INCOMPLETE)
-        return 0;
-
-    if (status == QD_MESSAGE_DEPTH_INVALID) {
-        qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
-               "[C%"PRIu64"][L%"PRIu64"] body data depth check failed",
-               hconn->conn_id, hconn->out_link_id);
-        return PN_REJECTED;
-    }
-
-    assert(status == QD_MESSAGE_DEPTH_OK);
 
     if (!hreq->headers_encoded) {
         uint64_t outcome = _send_request_headers(hreq, msg);
diff --git a/tests/system_tests_http1_adaptor.py b/tests/system_tests_http1_adaptor.py
index dbe1627..2f9ea1f 100644
--- a/tests/system_tests_http1_adaptor.py
+++ b/tests/system_tests_http1_adaptor.py
@@ -164,9 +164,6 @@ class RequestHandler(BaseHTTPRequestHandler):
     def log_request(self, code=None, size=None):
         pass
 
-    def log_error(self, format=None, *args):
-        pass
-
     def log_message(self, format=None, *args):
         pass
 
@@ -187,9 +184,12 @@ class RequestHandler(BaseHTTPRequestHandler):
                 body = b''
                 while True:
                     header = self.rfile.readline().strip().split(b';')[0]
-                    data = self.rfile.readline().rstrip()
-                    body += data
-                    if int(header) == 0:
+                    hlen = int(header, base=16)
+                    if hlen > 0:
+                        data = self.rfile.read(hlen + 2)  #+\r\n
+                        body += data[:-2]
+                    else:
+                        self.rfile.readline()  # discard last \r\n
                         break;
                 return body
         return self.rfile.read()
@@ -247,7 +247,8 @@ class TestServer(object):
             client.putrequest("POST", "/SHUTDOWN")
             client.putheader("Content-Length", "0")
             client.endheaders()
-            client.getresponse()
+            # 13 == len('Server Closed')
+            client.getresponse().read(13)
             client.close()
             self._thread.join(timeout=TIMEOUT)
         if self._server:
@@ -274,26 +275,25 @@ class ThreadedTestClient(object):
         for loop in range(self._repeat):
             for op, tests in self._tests.items():
                 for req, _, val in tests:
-                    self._logger.log("TestClient sending request")
+                    self._logger.log("TestClient sending %s %s request" % (op, req.target))
                     req.send_request(client)
-                    self._logger.log("TestClient getting response")
+                    self._logger.log("TestClient getting %s response" % op)
                     rsp = client.getresponse()
-                    self._logger.log("TestClient response received")
+                    self._logger.log("TestClient response %s received" % op)
                     if val:
                         try:
                             body = val.check_response(rsp)
                         except Exception as exc:
-                            self._logger.log("TestClient response invalid: %s",
-                                             str(exc))
+                            self._logger.log("TestClient response invalid: %s"
+                                             % str(exc))
                             self.error = "client failed: %s" % str(exc)
                             return
 
-                        if req.method is "BODY" and body != b'':
-                            self._logger.log("TestClient response invalid: %s",
-                                             "body present!")
+                        if req.method == "BODY" and body != b'':
+                            self._logger.log("TestClient response invalid: %s"
+                                             % "body present!")
                             self.error = "error: body present!"
                             return
-
         client.close()
         self._logger.log("TestClient to %s closed" % self._conn_addr)
 
@@ -493,7 +493,7 @@ class Http1AdaptorOneRouterTest(TestCase):
         ]
     }
 
-    # HTTP/1.0 compliant test cases  (no chunked, response length unspecified)
+    # HTTP/1.0 compliant test cases (no chunked, response length unspecified)
     TESTS_10 = {
         #
         # GET
@@ -530,15 +530,6 @@ class Http1AdaptorOneRouterTest(TestCase):
                           body=b'?')],
              ResponseValidator(expect_headers={'Content-Type': "text/plain;charset=utf-8"},
                                expect_body=b'?')),
-
-            # (RequestMsg("GET", "/GET/no_length",
-            #             headers={"Content-Length": "0"}),
-            #  ResponseMsg(200, reason="OK",
-            #              headers={"Content-Type": "text/plain;charset=utf-8",
-            #                       "connection": "close"
-            #              },
-            #              body=b'Hi! ' * 1024 + b'X'),
-            #  ResponseValidator(expect_body=b'Hi! ' * 1024 + b'X')),
         ],
         #
         # HEAD
@@ -648,14 +639,10 @@ class Http1AdaptorOneRouterTest(TestCase):
         #  <clients>  <servers>
 
         cls.routers = []
-        #cls.http_server11_port = cls.tester.get_port()
-        #cls.http_server10_port = cls.tester.get_port()
-        #cls.http_listener11_port = cls.tester.get_port()
-        #cls.http_listener10_port = cls.tester.get_port()
-        cls.http_server11_port = 9090
-        cls.http_listener11_port = 8080
-        cls.http_server10_port = 9091
-        cls.http_listener10_port = 8081
+        cls.http_server11_port = cls.tester.get_port()
+        cls.http_server10_port = cls.tester.get_port()
+        cls.http_listener11_port = cls.tester.get_port()
+        cls.http_listener10_port = cls.tester.get_port()
 
         router('INT.A', 'standalone',
                [('httpConnector', {'port': cls.http_server11_port,
@@ -701,7 +688,7 @@ class Http1AdaptorOneRouterTest(TestCase):
             except Exception as exc:
                 self.fail("request failed:  %s" % str(exc))
 
-            if req.method is "BODY":
+            if req.method == "BODY":
                 self.assertEqual(b'', body)
 
     def test_001_get(self):
@@ -753,65 +740,16 @@ class Http1AdaptorOneRouterTest(TestCase):
         client.close()
 
 
-class Http1AdaptorInteriorTest(TestCase):
+class Http1AdaptorEdge2EdgeTest(TestCase):
     """
-    Test an HTTP server connected to an interior router serving multiple HTTP
-    clients
+    Test an HTTP servers and clients attached to edge routers separated by an
+    interior router
     """
-    TESTS = {
-        "PUT": [
-            (RequestMsg("PUT", "/PUT/test",
-                        headers={"Header-1": "Value",
-                                 "Header-2": "Value",
-                                 "Content-Length": "20",
-                                 "Content-Type": "text/plain;charset=utf-8"},
-                        body=b'!' * 20),
-             ResponseMsg(201, reason="Created",
-                         headers={"Response-Header": "data",
-                                  "Content-Length": "0"}),
-             ResponseValidator(status=201)
-            )],
-
-        "POST": [
-            (RequestMsg("POST", "/POST/test",
-                        headers={"Header-1": "X",
-                                 "Content-Length": "11",
-                                 "Content-Type": "application/x-www-form-urlencoded"},
-                        body=b'one=1' + b'&two=2'),
-             ResponseMsg(200, reason="OK",
-                         headers={"Response-Header": "whatever",
-                                  "Content-Length": 10},
-                         body=b'0123456789'),
-             ResponseValidator()
-            )],
-
-        "GET": [
-            (RequestMsg("GET", "/GET/test",
-                        headers={"Content-Length": "000"}),
-             ResponseMsg(200, reason="OK",
-                         headers={"Content-Length": "655",
-                                  "Content-Type": "text/plain;charset=utf-8"},
-                         body=b'?' * 655),
-             ResponseValidator(expect_headers={'Content-Length': '655'},
-                               expect_body=b'?' * 655)
-            )],
-
-        "PUT": [
-            (RequestMsg("PUT", "/PUT/chunked",
-                        headers={"Transfer-Encoding": "chunked",
-                                 "Content-Type": "text/plain;charset=utf-8"},
-                        body=b'16\r\n' + b'!' * 0x16 + b'\r\n'
-                        + b'0\r\n\r\n'),
-             ResponseMsg(204, reason="No Content",
-                        headers={"Content-Length": "000"}),
-             ResponseValidator(status=204)
-            )],
-    }
 
     @classmethod
     def setUpClass(cls):
         """Start a router"""
-        super(Http1AdaptorInteriorTest, cls).setUpClass()
+        super(Http1AdaptorEdge2EdgeTest, cls).setUpClass()
 
         def router(name, mode, extra):
             config = [
@@ -833,72 +771,176 @@ class Http1AdaptorInteriorTest(TestCase):
         # configuration:
         # one edge, one interior
         #
-        #  +-------+    +---------+
-        #  |  EA1  |<==>|  INT.A  |
-        #  +-------+    +---------+
-        #      ^             ^
-        #      |             |
-        #      V             V
-        #  <clients>      <server>
+        #  +-------+    +---------+    +-------+
+        #  |  EA1  |<==>|  INT.A  |<==>|  EA2  |
+        #  +-------+    +---------+    +-------+
+        #      ^                           ^
+        #      |                           |
+        #      V                           V
+        #  <clients>                   <servers>
 
         cls.routers = []
-        cls.INTA_edge_port   = cls.tester.get_port()
-        #cls.http_server_port = cls.tester.get_port()
-        #cls.http_listener_port = cls.tester.get_port()
-        cls.http_server_port = 9090
-        cls.http_listener_port = 8080
+        cls.INTA_edge1_port   = cls.tester.get_port()
+        cls.INTA_edge2_port   = cls.tester.get_port()
+        cls.http_server11_port = cls.tester.get_port()
+        cls.http_listener11_port = cls.tester.get_port()
+        cls.http_server10_port = cls.tester.get_port()
+        cls.http_listener10_port = cls.tester.get_port()
 
         router('INT.A', 'interior',
-               [('listener', {'role': 'edge', 'port': cls.INTA_edge_port}),
-                ('httpConnector', {'port': cls.http_server_port,
-                                   'protocolVersion': 'HTTP1',
-                                   'address': 'testServer'})
+               [('listener', {'role': 'edge', 'port': cls.INTA_edge1_port}),
+                ('listener', {'role': 'edge', 'port': cls.INTA_edge2_port}),
                ])
         cls.INT_A = cls.routers[0]
         cls.INT_A.listener = cls.INT_A.addresses[0]
 
         router('EA1', 'edge',
                [('connector', {'name': 'uplink', 'role': 'edge',
-                               'port': cls.INTA_edge_port}),
-                ('httpListener', {'port': cls.http_listener_port,
+                               'port': cls.INTA_edge1_port}),
+                ('httpListener', {'port': cls.http_listener11_port,
                                   'protocolVersion': 'HTTP1',
-                                  'address': 'testServer'})
+                                  'address': 'testServer11'}),
+                ('httpListener', {'port': cls.http_listener10_port,
+                                  'protocolVersion': 'HTTP1',
+                                  'address': 'testServer10'})
                ])
         cls.EA1 = cls.routers[1]
         cls.EA1.listener = cls.EA1.addresses[0]
 
-        cls.EA1.wait_connectors()
-        cls.INT_A.wait_address('EA1')
+        router('EA2', 'edge',
+               [('connector', {'name': 'uplink', 'role': 'edge',
+                               'port': cls.INTA_edge2_port}),
+                ('httpConnector', {'port': cls.http_server11_port,
+                                   'protocolVersion': 'HTTP1',
+                                   'address': 'testServer11'}),
+                ('httpConnector', {'port': cls.http_server10_port,
+                                   'protocolVersion': 'HTTP1',
+                                   'address': 'testServer10'})
+               ])
+        cls.EA2 = cls.routers[-1]
+        cls.EA2.listener = cls.EA2.addresses[0]
 
+        cls.INT_A.wait_address('EA1')
+        cls.INT_A.wait_address('EA2')
 
-    def test_01_load(self):
+    def test_01_streaming(self):
         """
-        Test multiple clients running as fast as possible
+        Test multiple clients sending streaming messages in parallel
         """
-        server = TestServer(server_port=self.http_server_port,
-                            client_port=self.http_listener_port,
-                            tests=self.TESTS)
+        TESTS_11 = {
+            "PUT": [
+                (RequestMsg("PUT", "/PUT/streaming_test",
+                            headers={
+                                "Transfer-encoding": "chunked",
+                                "Content-Type": "text/plain;charset=utf-8"
+                            },
+
+                            # Note: due to DISPATCH-1791 the test cannot send
+                            # messages of length > Q2 without stalling. Until
+                            # this bug is fixed use a smaller message body
+                            # length:
+                            #
+                            #body=b'aBcDe\r\n' + b'1' * 0xabcde + b'\r\n'
+                            #+ b'a9B8C\r\n' + b'2' * 0xa9b8c + b'\r\n'
+                            #+ b'0\r\n\r\n'),
+
+                            body=b'FFFF\r\n' + b'1' * 0xFFFE + b'X\r\n'
+                            + b'0\r\n\r\n'),
+
+                 ResponseMsg(201, reason="Created",
+                             headers={"Response-Header": "data",
+                                      "Content-Length": "0"}),
+                 ResponseValidator(status=201)
+                )],
+
+            "GET": [
+                (RequestMsg("GET", "/GET/test",
+                            headers={"Content-Length": "000"}),
+                 ResponseMsg(200, reason="OK",
+                             headers={
+                                 "transfer-Encoding": "chunked",
+                                 "Content-Type": "text/plain;charset=utf-8"
+                             },
+
+                             # See DISPATCH-1791
+                             body=b'FFFF\r\n' + b'2' * 0xFFFF + b'\r\n'
+                             + b'0\r\n\r\n'),
+
+                             # body=b'20000\r\n' + b'0' * 0x20000 + b'\r\n'
+                             # + b'20019\r\n' + b'1' * 0x20019 + b'\r\n'
+                             # + b'20028\r\n' + b'2' * 0x20028 + b'\r\n'
+                             # + b'20037\r\n' + b'3' * 0x20037 + b'\r\n'
+                             # + b'20046\r\n' + b'4' * 0x20046 + b'\r\n'
+                             # + b'20054\r\n' + b'5' * 0x20054 + b'\r\n'
+                             # + b'20063\r\n' + b'6' * 0x20063 + b'\r\n'
+                             # + b'20072\r\n' + b'7' * 0x20072 + b'\r\n'
+                             # + b'20081\r\n' + b'8' * 0x20081 + b'\r\n'
+                             # + b'20090\r\n' + b'9' * 0x20090 + b'\r\n'
+                             # + b'0\r\n\r\n'),
+                 ResponseValidator(status=200)
+                )],
+        }
+
+        TESTS_10 = {
+            "POST": [
+                # See DISPATCH-1791: once fixed make body length > Q2
+                (RequestMsg("POST", "/POST/streaming_test",
+                            headers={"Header-1": "H" * 2048,
+                                     #"Content-Length": "1048577",
+                                     "Content-Length": "65536",
+                                     "Content-Type": "text/plain;charset=utf-8"},
+                            # body=b'P' * 1048577),
+                            body=b'P' * 65536),
+                 ResponseMsg(201, reason="Created",
+                             headers={"Response-Header": "data",
+                                      "Content-Length": "0"}),
+                 ResponseValidator(status=201)
+                )],
+
+            "GET": [
+                (RequestMsg("GET", "/GET/streaming_test",
+                            headers={"Content-Length": "000"}),
+                 ResponseMsg(200, reason="OK",
+                             #headers={"Content-Length": "999999",
+                             headers={"Content-Length": "65533",
+                                      "Content-Type": "text/plain;charset=utf-8"},
+                             #body=b'G' * 999999),
+                             body=b'G' * 65533),
+                 ResponseValidator(status=200),
+                ),
+                (RequestMsg("GET", "/GET/streaming_test_close",
+                            headers={"Content-Length": "000"}),
+                 ResponseMsg(200, reason="OK",
+                             headers={"Content-Type": "text/plain;charset=utf-8"},
+                             #body=b'C' * 999999),
+                             body=b'C' * 65537),
+                 ResponseValidator(status=200),
+                )],
+        }
+        server11 = TestServer(server_port=self.http_server11_port,
+                              client_port=self.http_listener11_port,
+                              tests=TESTS_11)
+        server10 = TestServer(server_port=self.http_server10_port,
+                              client_port=self.http_listener10_port,
+                              tests=TESTS_10,
+                              handler_cls=RequestHandler10)
+
+        self.EA2.wait_connectors()
 
         clients = []
-        for _ in range(5):
-            clients.append(ThreadedTestClient(self.TESTS,
-                                              self.http_listener_port,
-                                              repeat=2))
+        for _ in range(2):
+            clients.append(ThreadedTestClient(TESTS_11,
+                                              self.http_listener11_port,
+                                              repeat=10))
+            clients.append(ThreadedTestClient(TESTS_10,
+                                              self.http_listener10_port,
+                                              repeat=10))
         for client in clients:
             client.wait()
             self.assertIsNone(client.error)
 
-        # send command to stop the server thread
-        client = ThreadedTestClient({"POST": [(RequestMsg("POST",
-                                                          "/SHUTDOWN",
-                                                          {"Content-Length": "0"}),
-                                               None,
-                                               None)]},
-                                    self.http_listener_port)
-        client.wait()
-        self.assertIsNone(client.error)
-
-        server.wait()
+        server11.wait()
+        server10.wait()
 
 
 if __name__ == '__main__':


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org