You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2020/10/23 18:20:38 UTC
[qpid-dispatch] branch dev-protocol-adaptors-2 updated:
DISPATCH-1811: Check credit before sending delivery. This fix ensures that
the edge router case works. Also added system tests
This is an automated email from the ASF dual-hosted git repository.
gmurthy pushed a commit to branch dev-protocol-adaptors-2
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/dev-protocol-adaptors-2 by this push:
new 80dd51f DISPATCH-1811: Check credit before sending delivery. This fix ensures that the edge router case works. Also added system tests
80dd51f is described below
commit 80dd51fc74e009e40a464e7940c6ebd233fdb583
Author: Ganesh Murthy <gm...@apache.org>
AuthorDate: Fri Oct 23 14:20:09 2020 -0400
DISPATCH-1811: Check credit before sending delivery. This fix ensures that the edge router case works. Also added system tests
---
src/adaptors/http2/http2_adaptor.c | 39 ++++---
src/adaptors/http2/http2_adaptor.h | 1 +
tests/http2_server.py | 6 +-
tests/system_tests_http2.py | 211 +++++++++++++++++++++++++++++++++++--
4 files changed, 231 insertions(+), 26 deletions(-)
diff --git a/src/adaptors/http2/http2_adaptor.c b/src/adaptors/http2/http2_adaptor.c
index 4d1cd14..21277e6 100644
--- a/src/adaptors/http2/http2_adaptor.c
+++ b/src/adaptors/http2/http2_adaptor.c
@@ -626,27 +626,23 @@ static void compose_and_deliver(qdr_http2_stream_data_t *stream_data, qd_compose
}
qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"][L%"PRIu64"] Initiating qdr_link_deliver in compose_and_deliver", conn->conn_id, stream_data->stream_id, stream_data->in_link->identity);
- stream_data->in_dlv = qdr_link_deliver(stream_data->in_link, stream_data->message, 0, false, 0, 0, 0, 0);
- qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"][L%"PRIu64"] Routed delivery dlv:%lx", conn->conn_id, stream_data->stream_id, stream_data->in_link->identity, (long) stream_data->in_dlv);
-
- if (stream_data->in_dlv) {
+ if (!stream_data->in_dlv && stream_data->in_link_credit > 0) {
+ stream_data->in_dlv = qdr_link_deliver(stream_data->in_link, stream_data->message, 0, false, 0, 0, 0, 0);
+ qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"][S%"PRId32"][L%"PRIu64"] Routed delivery in qdr_http_flow dlv:%lx", stream_data->session_data->conn->conn_id, stream_data->stream_id, stream_data->in_link->identity, (long) stream_data->in_dlv);
qdr_delivery_set_context(stream_data->in_dlv, stream_data);
- qdr_delivery_decref(http2_adaptor->core, stream_data->in_dlv, "http2_adaptor - release protection of return from deliver");
- } else {
- //
- // If there is no delivery, the message is now and will always be unroutable because there is no address.
- //
- qd_message_set_discard(qdr_delivery_message(stream_data->in_dlv), true);
- if (receive_complete) {
- qd_message_free(qdr_delivery_message(stream_data->in_dlv));
- }
+ qdr_delivery_decref(http2_adaptor->core, stream_data->in_dlv, "http2_adaptor - qdr_http_flow - release protection of return from deliver");
+ stream_data->in_link_credit -= 1;
}
-
-
}
static bool route_delivery(qdr_http2_stream_data_t *stream_data, bool receive_complete)
{
+ if (stream_data->in_dlv)
+ return false;
+
+ if (stream_data->in_link_credit == 0)
+ return false;
+
qd_composed_field_t *header_and_props = 0;
qdr_http2_connection_t *conn = stream_data->session_data->conn;
@@ -1114,6 +1110,18 @@ static void qdr_http_detach(void *context, qdr_link_t *link, qdr_error_t *error,
static void qdr_http_flow(void *context, qdr_link_t *link, int credit)
{
+ if (credit > 0) {
+ qdr_http2_stream_data_t *stream_data = qdr_link_get_context(link);
+ if (! stream_data)
+ return;
+ stream_data->in_link_credit += credit;
+ if (route_delivery(stream_data, qd_message_receive_complete(stream_data->message))) {
+ qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"] qdr_http_flow, delivery routed successfully", stream_data->session_data->conn->conn_id);
+ }
+ else {
+ qd_log(http2_adaptor->log_source, QD_LOG_TRACE, "[C%"PRIu64"] qdr_http_flow delivery not routed", stream_data->session_data->conn->conn_id);
+ }
+ }
}
@@ -1531,7 +1539,6 @@ static uint64_t qdr_http_deliver(void *context, qdr_link_t *link, qdr_delivery_t
0,
&(stream_data->incoming_id));
qdr_link_set_context(stream_data->in_link, stream_data);
-
qd_log(http2_adaptor->log_source, QD_LOG_DEBUG, "[C%"PRIu64"] - qdr_http_deliver - delivery for stream_dispatcher", conn->conn_id);
}
else if (stream_data) {
diff --git a/src/adaptors/http2/http2_adaptor.h b/src/adaptors/http2/http2_adaptor.h
index 71b89da..47d35f6 100644
--- a/src/adaptors/http2/http2_adaptor.h
+++ b/src/adaptors/http2/http2_adaptor.h
@@ -90,6 +90,7 @@ struct qdr_http2_stream_data_t {
qd_message_body_data_result_t next_body_data_result;
int curr_body_data_qd_buff_offset;
int body_data_buff_count;
+ int in_link_credit; // provided by router
int32_t stream_id;
size_t qd_buffers_to_send;
qd_http2_stream_status_t status;
diff --git a/tests/http2_server.py b/tests/http2_server.py
index 889ebae..1f1ee5f 100644
--- a/tests/http2_server.py
+++ b/tests/http2_server.py
@@ -45,9 +45,9 @@ async def delete_myinfo(id): # noqa
@app.route('/myinfo', methods=['GET', 'POST', 'PUT'])
async def create_myinfo():
form = await request.form
- name = form['fname']
- age = form['lname']
- message = "Success! Your first name is %s, last name is %s" % (name, age)
+ fname = form['fname']
+ lname = form['lname']
+ message = "Success! Your first name is %s, last name is %s" % (fname, lname)
return message
def large_string(length):
diff --git a/tests/system_tests_http2.py b/tests/system_tests_http2.py
index 3dd0664..3b7501a 100644
--- a/tests/system_tests_http2.py
+++ b/tests/system_tests_http2.py
@@ -42,16 +42,20 @@ def curl_available():
return False
def quart_available():
+ """
+ Checks if quart version is greater than 0.13
+ """
popen_args = ['quart', '--version']
try:
process = Process(popen_args,
- name='curl_check',
+ name='quart_check',
stdout=PIPE,
expect=None,
universal_newlines=True)
out = process.communicate()[0]
parts = out.split(".")
- if int(parts[1]) >= 13:
+ major_version = parts[0]
+ if int(major_version[-1]) > 0 or int(parts[1]) >= 13:
return True
return False
except Exception as e:
@@ -87,8 +91,8 @@ class Http2TestBase(TestCase):
class CommonHttp2Tests():
"""
- The tests in this class are run by both Http2TestOneRouter and
- Http2TestTwoRouter
+ Common Base class containing all tests. These tests are run by all
+ topologies of routers.
"""
@SkipIfNeeded(skip_test(), "Python 3.7 or greater, Quart 0.13.0 or greater and curl needed to run http2 tests")
# Tests the HTTP2 head request
@@ -190,10 +194,10 @@ class CommonHttp2Tests():
self.assertTrue(passed)
-class Http2TestOneRouter(Http2TestBase, CommonHttp2Tests):
+class Http2TestOneStandaloneRouter(Http2TestBase, CommonHttp2Tests):
@classmethod
def setUpClass(cls):
- super(Http2TestOneRouter, cls).setUpClass()
+ super(Http2TestOneStandaloneRouter, cls).setUpClass()
if skip_test():
return
cls.http2_server_name = "http2_server"
@@ -217,6 +221,57 @@ class Http2TestOneRouter(Http2TestBase, CommonHttp2Tests):
cls.router_qdra = cls.tester.qdrouterd(name, config, wait=True)
+class Http2TestOneEdgeRouter(Http2TestBase, CommonHttp2Tests):
+ @classmethod
+ def setUpClass(cls):
+ super(Http2TestOneEdgeRouter, cls).setUpClass()
+ if skip_test():
+ return
+ cls.http2_server_name = "http2_server"
+ os.environ["QUART_APP"] = "http2server:app"
+ os.environ['SERVER_LISTEN_PORT'] = str(cls.tester.get_port())
+ cls.http2_server = cls.tester.http2server(name=cls.http2_server_name,
+ listen_port=int(os.getenv('SERVER_LISTEN_PORT')),
+ py_string='python3',
+ server_file="http2_server.py")
+ name = "http2-test-router"
+ config = Qdrouterd.Config([
+ ('router', {'mode': 'edge', 'id': 'QDR'}),
+ ('listener', {'port': cls.tester.get_port(), 'role': 'normal', 'host': '0.0.0.0'}),
+
+ ('httpListener', {'port': cls.tester.get_port(), 'address': 'examples',
+ 'host': '127.0.0.1', 'protocolVersion': 'HTTP2'}),
+ ('httpConnector',
+ {'port': os.getenv('SERVER_LISTEN_PORT'), 'address': 'examples',
+ 'host': '127.0.0.1', 'protocolVersion': 'HTTP2'})
+ ])
+ cls.router_qdra = cls.tester.qdrouterd(name, config, wait=True)
+
+class Http2TestOneInteriorRouter(Http2TestBase, CommonHttp2Tests):
+ @classmethod
+ def setUpClass(cls):
+ super(Http2TestOneInteriorRouter, cls).setUpClass()
+ if skip_test():
+ return
+ cls.http2_server_name = "http2_server"
+ os.environ["QUART_APP"] = "http2server:app"
+ os.environ['SERVER_LISTEN_PORT'] = str(cls.tester.get_port())
+ cls.http2_server = cls.tester.http2server(name=cls.http2_server_name,
+ listen_port=int(os.getenv('SERVER_LISTEN_PORT')),
+ py_string='python3',
+ server_file="http2_server.py")
+ name = "http2-test-router"
+ config = Qdrouterd.Config([
+ ('router', {'mode': 'interior', 'id': 'QDR'}),
+ ('listener', {'port': cls.tester.get_port(), 'role': 'normal', 'host': '0.0.0.0'}),
+
+ ('httpListener', {'port': cls.tester.get_port(), 'address': 'examples',
+ 'host': '127.0.0.1', 'protocolVersion': 'HTTP2'}),
+ ('httpConnector',
+ {'port': os.getenv('SERVER_LISTEN_PORT'), 'address': 'examples',
+ 'host': '127.0.0.1', 'protocolVersion': 'HTTP2'})
+ ])
+ cls.router_qdra = cls.tester.qdrouterd(name, config, wait=True)
class Http2TestTwoRouter(Http2TestBase, CommonHttp2Tests):
@classmethod
@@ -260,4 +315,146 @@ class Http2TestTwoRouter(Http2TestBase, CommonHttp2Tests):
cls.router_qdra.wait_router_connected('QDR.B')
cls.router_qdrb.wait_router_connected('QDR.A')
- sleep(2)
\ No newline at end of file
+ sleep(2)
+
+
+
+class Http2TestEdgeInteriorRouter(Http2TestBase, CommonHttp2Tests):
+ """
+ The interior router connects to the HTTP2 server and the curl client
+ connects to the edge router.
+ """
+ @classmethod
+ def setUpClass(cls):
+ super(Http2TestEdgeInteriorRouter, cls).setUpClass()
+ if skip_test():
+ return
+ cls.http2_server_name = "http2_server"
+ os.environ["QUART_APP"] = "http2server:app"
+ os.environ['SERVER_LISTEN_PORT'] = str(cls.tester.get_port())
+ cls.http2_server = cls.tester.http2server(name=cls.http2_server_name,
+ listen_port=int(os.getenv('SERVER_LISTEN_PORT')),
+ py_string='python3',
+ server_file="http2_server.py")
+ inter_router_port = cls.tester.get_port()
+ config_edgea = Qdrouterd.Config([
+ ('router', {'mode': 'edge', 'id': 'EDGE.A'}),
+ ('listener', {'port': cls.tester.get_port(), 'role': 'normal', 'host': '0.0.0.0'}),
+ ('httpListener', {'port': cls.tester.get_port(), 'address': 'examples',
+ 'host': '127.0.0.1', 'protocolVersion': 'HTTP2'}),
+ ('connector', {'name': 'connectorToA', 'role': 'edge',
+ 'port': inter_router_port,
+ 'verifyHostname': 'no'})
+ ])
+
+ config_qdrb = Qdrouterd.Config([
+ ('router', {'mode': 'interior', 'id': 'QDR.A'}),
+ ('listener', {'port': cls.tester.get_port(), 'role': 'normal', 'host': '0.0.0.0'}),
+ ('listener', {'role': 'edge', 'port': inter_router_port}),
+ ('httpConnector',
+ {'port': os.getenv('SERVER_LISTEN_PORT'), 'address': 'examples',
+ 'host': '127.0.0.1', 'protocolVersion': 'HTTP2'})
+ ])
+
+ cls.router_qdrb = cls.tester.qdrouterd("interior-router", config_qdrb, wait=True)
+ cls.router_qdra = cls.tester.qdrouterd("edge-router", config_edgea)
+ sleep(3)
+
+
+class Http2TestInteriorEdgeRouter(Http2TestBase, CommonHttp2Tests):
+ """
+ The edge router connects to the HTTP2 server and the curl client
+ connects to the interior router.
+ """
+ @classmethod
+ def setUpClass(cls):
+ super(Http2TestInteriorEdgeRouter, cls).setUpClass()
+ if skip_test():
+ return
+ cls.http2_server_name = "http2_server"
+ os.environ["QUART_APP"] = "http2server:app"
+ os.environ['SERVER_LISTEN_PORT'] = str(cls.tester.get_port())
+ cls.http2_server = cls.tester.http2server(name=cls.http2_server_name,
+ listen_port=int(os.getenv('SERVER_LISTEN_PORT')),
+ py_string='python3',
+ server_file="http2_server.py")
+ inter_router_port = cls.tester.get_port()
+ config_edge = Qdrouterd.Config([
+ ('router', {'mode': 'edge', 'id': 'EDGE.A'}),
+ ('listener', {'port': cls.tester.get_port(), 'role': 'normal', 'host': '0.0.0.0'}),
+ ('httpConnector',
+ {'port': os.getenv('SERVER_LISTEN_PORT'), 'address': 'examples',
+ 'host': '127.0.0.1', 'protocolVersion': 'HTTP2'}),
+ ('connector', {'name': 'connectorToA', 'role': 'edge',
+ 'port': inter_router_port,
+ 'verifyHostname': 'no'})
+ ])
+
+ config_qdra = Qdrouterd.Config([
+ ('router', {'mode': 'interior', 'id': 'QDR.A'}),
+ ('listener', {'port': cls.tester.get_port(), 'role': 'normal', 'host': '0.0.0.0'}),
+ ('listener', {'role': 'edge', 'port': inter_router_port}),
+ ('httpListener',
+ {'port': cls.tester.get_port(), 'address': 'examples',
+ 'host': '127.0.0.1', 'protocolVersion': 'HTTP2'}),
+ ])
+
+ cls.router_qdra = cls.tester.qdrouterd("interior-router", config_qdra, wait=True)
+ cls.router_qdrb = cls.tester.qdrouterd("edge-router", config_edge)
+ sleep(3)
+
+
+
+class Http2TestEdgeToEdgeViaInteriorRouter(Http2TestBase, CommonHttp2Tests):
+ """
+ The edge router connects to the HTTP2 server and the curl client
+ connects to another edge router. The two edge routers are connected
+ via an interior router.
+ """
+ @classmethod
+ def setUpClass(cls):
+ super(Http2TestEdgeToEdgeViaInteriorRouter, cls).setUpClass()
+ if skip_test():
+ return
+ cls.http2_server_name = "http2_server"
+ os.environ["QUART_APP"] = "http2server:app"
+ os.environ['SERVER_LISTEN_PORT'] = str(cls.tester.get_port())
+ cls.http2_server = cls.tester.http2server(name=cls.http2_server_name,
+ listen_port=int(os.getenv('SERVER_LISTEN_PORT')),
+ py_string='python3',
+ server_file="http2_server.py")
+ inter_router_port = cls.tester.get_port()
+ config_edge_b = Qdrouterd.Config([
+ ('router', {'mode': 'edge', 'id': 'EDGE.A'}),
+ ('listener', {'port': cls.tester.get_port(), 'role': 'normal', 'host': '0.0.0.0'}),
+ ('httpConnector',
+ {'port': os.getenv('SERVER_LISTEN_PORT'), 'address': 'examples',
+ 'host': '127.0.0.1', 'protocolVersion': 'HTTP2'}),
+ ('connector', {'name': 'connectorToA', 'role': 'edge',
+ 'port': inter_router_port,
+ 'verifyHostname': 'no'})
+ ])
+
+ config_qdra = Qdrouterd.Config([
+ ('router', {'mode': 'interior', 'id': 'QDR.A'}),
+ ('listener', {'port': cls.tester.get_port(), 'role': 'normal', 'host': '0.0.0.0'}),
+ ('listener', {'role': 'edge', 'port': inter_router_port}),
+ ])
+
+ config_edge_a = Qdrouterd.Config([
+ ('router', {'mode': 'edge', 'id': 'EDGE.B'}),
+ ('listener', {'port': cls.tester.get_port(), 'role': 'normal',
+ 'host': '0.0.0.0'}),
+ ('httpListener',
+ {'port': cls.tester.get_port(), 'address': 'examples',
+ 'host': '127.0.0.1', 'protocolVersion': 'HTTP2'}),
+ ('connector', {'name': 'connectorToA', 'role': 'edge',
+ 'port': inter_router_port,
+ 'verifyHostname': 'no'})
+ ])
+
+ cls.interior_qdr = cls.tester.qdrouterd("interior-router", config_qdra,
+ wait=True)
+ cls.router_qdra = cls.tester.qdrouterd("edge-router-a", config_edge_a)
+ cls.router_qdrb = cls.tester.qdrouterd("edge-router-b", config_edge_b)
+ sleep(5)
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org