You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2020/10/13 15:21:40 UTC
[qpid-dispatch] branch dev-protocol-adaptors updated:
DISPATCH-1744: add multi-hop large msg tests
This is an automated email from the ASF dual-hosted git repository.
kgiusti pushed a commit to branch dev-protocol-adaptors
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/dev-protocol-adaptors by this push:
new ae6d2c2 DISPATCH-1744: add multi-hop large msg tests
ae6d2c2 is described below
commit ae6d2c29101a4feb860ac24ffc10837f143700b1
Author: Kenneth Giusti <kg...@apache.org>
AuthorDate: Wed Oct 7 09:50:53 2020 -0400
DISPATCH-1744: add multi-hop large msg tests
---
src/adaptors/http1/http1_adaptor.c | 7 +-
src/adaptors/http1/http1_codec.c | 3 +-
src/adaptors/http1/http1_private.h | 3 +
src/adaptors/http1/http1_server.c | 17 +-
tests/system_tests_http1_adaptor.py | 300 ++++++++++++++++++++----------------
5 files changed, 179 insertions(+), 151 deletions(-)
diff --git a/src/adaptors/http1/http1_adaptor.c b/src/adaptors/http1/http1_adaptor.c
index 48e9326..4663385 100644
--- a/src/adaptors/http1/http1_adaptor.c
+++ b/src/adaptors/http1/http1_adaptor.c
@@ -30,9 +30,6 @@
// to HTTP endpoint processing.
//
-// for debug: will dump raw buffer content to stdout if true
-#define HTTP1_DUMP_BUFFERS false
-
#define RAW_BUFFER_BATCH 16
@@ -384,8 +381,8 @@ void qdr_http1_free_written_buffers(qdr_http1_connection_t *hconn)
if (HTTP1_DUMP_BUFFERS) {
char *ptr = (char*) buffers[i].bytes;
int len = (int) buffers[i].size;
- fprintf(stdout, "\n[C%"PRIu64"] Raw Written: Ptr=%p len=%d\n value='%.*s'\n",
- hconn->conn_id, (void*)ptr, len, len, ptr);
+ fprintf(stdout, "\n[C%"PRIu64"] Raw Written: Ptr=%p len=%d c=%d o=%d\n value='%.*s'\n",
+ hconn->conn_id, (void*)ptr, len, buffers[i].capacity, buffers[i].offset, len, ptr);
fflush(stdout);
}
diff --git a/src/adaptors/http1/http1_codec.c b/src/adaptors/http1/http1_codec.c
index d1fbea9..7b63937 100644
--- a/src/adaptors/http1/http1_codec.c
+++ b/src/adaptors/http1/http1_codec.c
@@ -1087,10 +1087,11 @@ static bool parse_body_chunked_data(h1_codec_connection_t *conn, struct decoder_
decoder->chunk_length -= skipped;
body_ptr->remaining += skipped;
+ consume_body_data(conn, false);
+
if (decoder->chunk_length == 0) {
// end of chunk
decoder->chunk_state = HTTP1_CHUNK_HEADER;
- consume_body_data(conn, false);
}
return !!rptr->remaining;
diff --git a/src/adaptors/http1/http1_private.h b/src/adaptors/http1/http1_private.h
index d82e421..8e29ae2 100644
--- a/src/adaptors/http1/http1_private.h
+++ b/src/adaptors/http1/http1_private.h
@@ -34,6 +34,9 @@
#include <qpid/dispatch/message.h>
#include "adaptors/http_common.h"
+// for debug: will dump I/O buffer content to stdout if true
+#define HTTP1_DUMP_BUFFERS false
+
typedef struct qdr_http1_out_data_t qdr_http1_out_data_t;
typedef struct qdr_http1_out_data_fifo_t qdr_http1_out_data_fifo_t;
typedef struct qdr_http1_request_base_t qdr_http1_request_base_t;
diff --git a/src/adaptors/http1/http1_server.c b/src/adaptors/http1/http1_server.c
index 2d11600..69df490 100644
--- a/src/adaptors/http1/http1_server.c
+++ b/src/adaptors/http1/http1_server.c
@@ -27,9 +27,6 @@
// connection is terminated at an HTTP server, not an HTTP client.
//
-// for debug: dump raw buffers to stdout if true
-#define HTTP1_DUMP_BUFFERS false
-
//
// State for a single response message arriving via the raw connection. This
@@ -1107,6 +1104,7 @@ static uint64_t _send_request_headers(_server_request_t *hreq, qd_message_t *msg
uint64_t outcome = 0;
assert(!hreq->base.lib_rs);
+ assert(qd_message_check_depth(msg, QD_DEPTH_PROPERTIES) == QD_MESSAGE_DEPTH_OK);
// method is passed in the SUBJECT field
qd_iterator_t *method_iter = qd_message_field_iterator(msg, QD_FIELD_SUBJECT);
@@ -1220,19 +1218,6 @@ static uint64_t _encode_request_message(_server_request_t *hreq)
{
qdr_http1_connection_t *hconn = hreq->base.hconn;
qd_message_t *msg = qdr_delivery_message(hreq->request_dlv);
- qd_message_depth_status_t status = qd_message_check_depth(msg, QD_DEPTH_BODY);
-
- if (status == QD_MESSAGE_DEPTH_INCOMPLETE)
- return 0;
-
- if (status == QD_MESSAGE_DEPTH_INVALID) {
- qd_log(qdr_http1_adaptor->log, QD_LOG_WARNING,
- "[C%"PRIu64"][L%"PRIu64"] body data depth check failed",
- hconn->conn_id, hconn->out_link_id);
- return PN_REJECTED;
- }
-
- assert(status == QD_MESSAGE_DEPTH_OK);
if (!hreq->headers_encoded) {
uint64_t outcome = _send_request_headers(hreq, msg);
diff --git a/tests/system_tests_http1_adaptor.py b/tests/system_tests_http1_adaptor.py
index dbe1627..2f9ea1f 100644
--- a/tests/system_tests_http1_adaptor.py
+++ b/tests/system_tests_http1_adaptor.py
@@ -164,9 +164,6 @@ class RequestHandler(BaseHTTPRequestHandler):
def log_request(self, code=None, size=None):
pass
- def log_error(self, format=None, *args):
- pass
-
def log_message(self, format=None, *args):
pass
@@ -187,9 +184,12 @@ class RequestHandler(BaseHTTPRequestHandler):
body = b''
while True:
header = self.rfile.readline().strip().split(b';')[0]
- data = self.rfile.readline().rstrip()
- body += data
- if int(header) == 0:
+ hlen = int(header, base=16)
+ if hlen > 0:
+ data = self.rfile.read(hlen + 2) #+\r\n
+ body += data[:-2]
+ else:
+ self.rfile.readline() # discard last \r\n
break;
return body
return self.rfile.read()
@@ -247,7 +247,8 @@ class TestServer(object):
client.putrequest("POST", "/SHUTDOWN")
client.putheader("Content-Length", "0")
client.endheaders()
- client.getresponse()
+ # 13 == len('Server Closed')
+ client.getresponse().read(13)
client.close()
self._thread.join(timeout=TIMEOUT)
if self._server:
@@ -274,26 +275,25 @@ class ThreadedTestClient(object):
for loop in range(self._repeat):
for op, tests in self._tests.items():
for req, _, val in tests:
- self._logger.log("TestClient sending request")
+ self._logger.log("TestClient sending %s %s request" % (op, req.target))
req.send_request(client)
- self._logger.log("TestClient getting response")
+ self._logger.log("TestClient getting %s response" % op)
rsp = client.getresponse()
- self._logger.log("TestClient response received")
+ self._logger.log("TestClient response %s received" % op)
if val:
try:
body = val.check_response(rsp)
except Exception as exc:
- self._logger.log("TestClient response invalid: %s",
- str(exc))
+ self._logger.log("TestClient response invalid: %s"
+ % str(exc))
self.error = "client failed: %s" % str(exc)
return
- if req.method is "BODY" and body != b'':
- self._logger.log("TestClient response invalid: %s",
- "body present!")
+ if req.method == "BODY" and body != b'':
+ self._logger.log("TestClient response invalid: %s"
+ % "body present!")
self.error = "error: body present!"
return
-
client.close()
self._logger.log("TestClient to %s closed" % self._conn_addr)
@@ -493,7 +493,7 @@ class Http1AdaptorOneRouterTest(TestCase):
]
}
- # HTTP/1.0 compliant test cases (no chunked, response length unspecified)
+ # HTTP/1.0 compliant test cases (no chunked, response length unspecified)
TESTS_10 = {
#
# GET
@@ -530,15 +530,6 @@ class Http1AdaptorOneRouterTest(TestCase):
body=b'?')],
ResponseValidator(expect_headers={'Content-Type': "text/plain;charset=utf-8"},
expect_body=b'?')),
-
- # (RequestMsg("GET", "/GET/no_length",
- # headers={"Content-Length": "0"}),
- # ResponseMsg(200, reason="OK",
- # headers={"Content-Type": "text/plain;charset=utf-8",
- # "connection": "close"
- # },
- # body=b'Hi! ' * 1024 + b'X'),
- # ResponseValidator(expect_body=b'Hi! ' * 1024 + b'X')),
],
#
# HEAD
@@ -648,14 +639,10 @@ class Http1AdaptorOneRouterTest(TestCase):
# <clients> <servers>
cls.routers = []
- #cls.http_server11_port = cls.tester.get_port()
- #cls.http_server10_port = cls.tester.get_port()
- #cls.http_listener11_port = cls.tester.get_port()
- #cls.http_listener10_port = cls.tester.get_port()
- cls.http_server11_port = 9090
- cls.http_listener11_port = 8080
- cls.http_server10_port = 9091
- cls.http_listener10_port = 8081
+ cls.http_server11_port = cls.tester.get_port()
+ cls.http_server10_port = cls.tester.get_port()
+ cls.http_listener11_port = cls.tester.get_port()
+ cls.http_listener10_port = cls.tester.get_port()
router('INT.A', 'standalone',
[('httpConnector', {'port': cls.http_server11_port,
@@ -701,7 +688,7 @@ class Http1AdaptorOneRouterTest(TestCase):
except Exception as exc:
self.fail("request failed: %s" % str(exc))
- if req.method is "BODY":
+ if req.method == "BODY":
self.assertEqual(b'', body)
def test_001_get(self):
@@ -753,65 +740,16 @@ class Http1AdaptorOneRouterTest(TestCase):
client.close()
-class Http1AdaptorInteriorTest(TestCase):
+class Http1AdaptorEdge2EdgeTest(TestCase):
"""
- Test an HTTP server connected to an interior router serving multiple HTTP
- clients
+ Test an HTTP servers and clients attached to edge routers separated by an
+ interior router
"""
- TESTS = {
- "PUT": [
- (RequestMsg("PUT", "/PUT/test",
- headers={"Header-1": "Value",
- "Header-2": "Value",
- "Content-Length": "20",
- "Content-Type": "text/plain;charset=utf-8"},
- body=b'!' * 20),
- ResponseMsg(201, reason="Created",
- headers={"Response-Header": "data",
- "Content-Length": "0"}),
- ResponseValidator(status=201)
- )],
-
- "POST": [
- (RequestMsg("POST", "/POST/test",
- headers={"Header-1": "X",
- "Content-Length": "11",
- "Content-Type": "application/x-www-form-urlencoded"},
- body=b'one=1' + b'&two=2'),
- ResponseMsg(200, reason="OK",
- headers={"Response-Header": "whatever",
- "Content-Length": 10},
- body=b'0123456789'),
- ResponseValidator()
- )],
-
- "GET": [
- (RequestMsg("GET", "/GET/test",
- headers={"Content-Length": "000"}),
- ResponseMsg(200, reason="OK",
- headers={"Content-Length": "655",
- "Content-Type": "text/plain;charset=utf-8"},
- body=b'?' * 655),
- ResponseValidator(expect_headers={'Content-Length': '655'},
- expect_body=b'?' * 655)
- )],
-
- "PUT": [
- (RequestMsg("PUT", "/PUT/chunked",
- headers={"Transfer-Encoding": "chunked",
- "Content-Type": "text/plain;charset=utf-8"},
- body=b'16\r\n' + b'!' * 0x16 + b'\r\n'
- + b'0\r\n\r\n'),
- ResponseMsg(204, reason="No Content",
- headers={"Content-Length": "000"}),
- ResponseValidator(status=204)
- )],
- }
@classmethod
def setUpClass(cls):
"""Start a router"""
- super(Http1AdaptorInteriorTest, cls).setUpClass()
+ super(Http1AdaptorEdge2EdgeTest, cls).setUpClass()
def router(name, mode, extra):
config = [
@@ -833,72 +771,176 @@ class Http1AdaptorInteriorTest(TestCase):
# configuration:
# one edge, one interior
#
- # +-------+ +---------+
- # | EA1 |<==>| INT.A |
- # +-------+ +---------+
- # ^ ^
- # | |
- # V V
- # <clients> <server>
+ # +-------+ +---------+ +-------+
+ # | EA1 |<==>| INT.A |<==>| EA2 |
+ # +-------+ +---------+ +-------+
+ # ^ ^
+ # | |
+ # V V
+ # <clients> <servers>
cls.routers = []
- cls.INTA_edge_port = cls.tester.get_port()
- #cls.http_server_port = cls.tester.get_port()
- #cls.http_listener_port = cls.tester.get_port()
- cls.http_server_port = 9090
- cls.http_listener_port = 8080
+ cls.INTA_edge1_port = cls.tester.get_port()
+ cls.INTA_edge2_port = cls.tester.get_port()
+ cls.http_server11_port = cls.tester.get_port()
+ cls.http_listener11_port = cls.tester.get_port()
+ cls.http_server10_port = cls.tester.get_port()
+ cls.http_listener10_port = cls.tester.get_port()
router('INT.A', 'interior',
- [('listener', {'role': 'edge', 'port': cls.INTA_edge_port}),
- ('httpConnector', {'port': cls.http_server_port,
- 'protocolVersion': 'HTTP1',
- 'address': 'testServer'})
+ [('listener', {'role': 'edge', 'port': cls.INTA_edge1_port}),
+ ('listener', {'role': 'edge', 'port': cls.INTA_edge2_port}),
])
cls.INT_A = cls.routers[0]
cls.INT_A.listener = cls.INT_A.addresses[0]
router('EA1', 'edge',
[('connector', {'name': 'uplink', 'role': 'edge',
- 'port': cls.INTA_edge_port}),
- ('httpListener', {'port': cls.http_listener_port,
+ 'port': cls.INTA_edge1_port}),
+ ('httpListener', {'port': cls.http_listener11_port,
'protocolVersion': 'HTTP1',
- 'address': 'testServer'})
+ 'address': 'testServer11'}),
+ ('httpListener', {'port': cls.http_listener10_port,
+ 'protocolVersion': 'HTTP1',
+ 'address': 'testServer10'})
])
cls.EA1 = cls.routers[1]
cls.EA1.listener = cls.EA1.addresses[0]
- cls.EA1.wait_connectors()
- cls.INT_A.wait_address('EA1')
+ router('EA2', 'edge',
+ [('connector', {'name': 'uplink', 'role': 'edge',
+ 'port': cls.INTA_edge2_port}),
+ ('httpConnector', {'port': cls.http_server11_port,
+ 'protocolVersion': 'HTTP1',
+ 'address': 'testServer11'}),
+ ('httpConnector', {'port': cls.http_server10_port,
+ 'protocolVersion': 'HTTP1',
+ 'address': 'testServer10'})
+ ])
+ cls.EA2 = cls.routers[-1]
+ cls.EA2.listener = cls.EA2.addresses[0]
+ cls.INT_A.wait_address('EA1')
+ cls.INT_A.wait_address('EA2')
- def test_01_load(self):
+ def test_01_streaming(self):
"""
- Test multiple clients running as fast as possible
+ Test multiple clients sending streaming messages in parallel
"""
- server = TestServer(server_port=self.http_server_port,
- client_port=self.http_listener_port,
- tests=self.TESTS)
+ TESTS_11 = {
+ "PUT": [
+ (RequestMsg("PUT", "/PUT/streaming_test",
+ headers={
+ "Transfer-encoding": "chunked",
+ "Content-Type": "text/plain;charset=utf-8"
+ },
+
+ # Note: due to DISPATCH-1791 the test cannot send
+ # messages of length > Q2 without stalling. Until
+ # this bug is fixed use a smaller message body
+ # length:
+ #
+ #body=b'aBcDe\r\n' + b'1' * 0xabcde + b'\r\n'
+ #+ b'a9B8C\r\n' + b'2' * 0xa9b8c + b'\r\n'
+ #+ b'0\r\n\r\n'),
+
+ body=b'FFFF\r\n' + b'1' * 0xFFFE + b'X\r\n'
+ + b'0\r\n\r\n'),
+
+ ResponseMsg(201, reason="Created",
+ headers={"Response-Header": "data",
+ "Content-Length": "0"}),
+ ResponseValidator(status=201)
+ )],
+
+ "GET": [
+ (RequestMsg("GET", "/GET/test",
+ headers={"Content-Length": "000"}),
+ ResponseMsg(200, reason="OK",
+ headers={
+ "transfer-Encoding": "chunked",
+ "Content-Type": "text/plain;charset=utf-8"
+ },
+
+ # See DISPATCH-1791
+ body=b'FFFF\r\n' + b'2' * 0xFFFF + b'\r\n'
+ + b'0\r\n\r\n'),
+
+ # body=b'20000\r\n' + b'0' * 0x20000 + b'\r\n'
+ # + b'20019\r\n' + b'1' * 0x20019 + b'\r\n'
+ # + b'20028\r\n' + b'2' * 0x20028 + b'\r\n'
+ # + b'20037\r\n' + b'3' * 0x20037 + b'\r\n'
+ # + b'20046\r\n' + b'4' * 0x20046 + b'\r\n'
+ # + b'20054\r\n' + b'5' * 0x20054 + b'\r\n'
+ # + b'20063\r\n' + b'6' * 0x20063 + b'\r\n'
+ # + b'20072\r\n' + b'7' * 0x20072 + b'\r\n'
+ # + b'20081\r\n' + b'8' * 0x20081 + b'\r\n'
+ # + b'20090\r\n' + b'9' * 0x20090 + b'\r\n'
+ # + b'0\r\n\r\n'),
+ ResponseValidator(status=200)
+ )],
+ }
+
+ TESTS_10 = {
+ "POST": [
+ # See DISPATCH-1791: once fixed make body length > Q2
+ (RequestMsg("POST", "/POST/streaming_test",
+ headers={"Header-1": "H" * 2048,
+ #"Content-Length": "1048577",
+ "Content-Length": "65536",
+ "Content-Type": "text/plain;charset=utf-8"},
+ # body=b'P' * 1048577),
+ body=b'P' * 65536),
+ ResponseMsg(201, reason="Created",
+ headers={"Response-Header": "data",
+ "Content-Length": "0"}),
+ ResponseValidator(status=201)
+ )],
+
+ "GET": [
+ (RequestMsg("GET", "/GET/streaming_test",
+ headers={"Content-Length": "000"}),
+ ResponseMsg(200, reason="OK",
+ #headers={"Content-Length": "999999",
+ headers={"Content-Length": "65533",
+ "Content-Type": "text/plain;charset=utf-8"},
+ #body=b'G' * 999999),
+ body=b'G' * 65533),
+ ResponseValidator(status=200),
+ ),
+ (RequestMsg("GET", "/GET/streaming_test_close",
+ headers={"Content-Length": "000"}),
+ ResponseMsg(200, reason="OK",
+ headers={"Content-Type": "text/plain;charset=utf-8"},
+ #body=b'C' * 999999),
+ body=b'C' * 65537),
+ ResponseValidator(status=200),
+ )],
+ }
+ server11 = TestServer(server_port=self.http_server11_port,
+ client_port=self.http_listener11_port,
+ tests=TESTS_11)
+ server10 = TestServer(server_port=self.http_server10_port,
+ client_port=self.http_listener10_port,
+ tests=TESTS_10,
+ handler_cls=RequestHandler10)
+
+ self.EA2.wait_connectors()
clients = []
- for _ in range(5):
- clients.append(ThreadedTestClient(self.TESTS,
- self.http_listener_port,
- repeat=2))
+ for _ in range(2):
+ clients.append(ThreadedTestClient(TESTS_11,
+ self.http_listener11_port,
+ repeat=10))
+ clients.append(ThreadedTestClient(TESTS_10,
+ self.http_listener10_port,
+ repeat=10))
for client in clients:
client.wait()
self.assertIsNone(client.error)
- # send command to stop the server thread
- client = ThreadedTestClient({"POST": [(RequestMsg("POST",
- "/SHUTDOWN",
- {"Content-Length": "0"}),
- None,
- None)]},
- self.http_listener_port)
- client.wait()
- self.assertIsNone(client.error)
-
- server.wait()
+ server11.wait()
+ server10.wait()
if __name__ == '__main__':
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org