You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2019/01/02 19:20:39 UTC
[qpid-dispatch] branch master updated: DISPATCH-1231: correct
credit handling in core client. This closes #431
This is an automated email from the ASF dual-hosted git repository.
gmurthy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/master by this push:
new e54e0f4 DISPATCH-1231: correct credit handling in core client. This closes #431
e54e0f4 is described below
commit e54e0f46a45e9dd825cbe2b1eaf60bee226b1bd4
Author: Kenneth Giusti <kg...@apache.org>
AuthorDate: Sun Dec 23 19:55:48 2018 -0500
DISPATCH-1231: correct credit handling in core client. This closes #431
---
src/router_core/core_client_api.c | 6 +++---
.../modules/address_lookup_client/lookup_client.c | 4 ++--
src/router_core/modules/edge_router/edge_mgmt.c | 11 ++++-------
.../modules/edge_router/link_route_proxy.c | 11 ++++++++++-
.../modules/edge_router/link_route_proxy.h | 2 +-
.../modules/test_hooks/core_test_hooks.c | 23 ++++++++++++++++------
tests/system_tests_core_client.py | 16 +++++++--------
7 files changed, 44 insertions(+), 29 deletions(-)
diff --git a/src/router_core/core_client_api.c b/src/router_core/core_client_api.c
index 7483869..479a45f 100644
--- a/src/router_core/core_client_api.c
+++ b/src/router_core/core_client_api.c
@@ -435,11 +435,11 @@ static void _sender_flow_CT(void *context,
qdrc_client_t *client = (qdrc_client_t *)context;
qdr_core_t *core = client->core;
+ client->tx_credit += available_credit;
qd_log(core->log, QD_LOG_TRACE,
"Core client sender flow granted c=%p credit=%d d=%s",
- client, available_credit, (drain) ? "T" : "F");
- client->tx_credit = available_credit;
- if (available_credit > 0) {
+ client, client->tx_credit, (drain) ? "T" : "F");
+ if (client->tx_credit > 0) {
_flush_send_queue_CT(client);
}
diff --git a/src/router_core/modules/address_lookup_client/lookup_client.c b/src/router_core/modules/address_lookup_client/lookup_client.c
index 048a399..197bd34 100644
--- a/src/router_core/modules/address_lookup_client/lookup_client.c
+++ b/src/router_core/modules/address_lookup_client/lookup_client.c
@@ -575,12 +575,12 @@ static void on_state(qdr_core_t *core,
static void on_flow(qdr_core_t *core,
qdrc_client_t *api_client,
void *user_context,
- int available_credit,
+ int more_credit,
bool drain)
{
qcm_lookup_client_t *client = (qcm_lookup_client_t*) user_context;
- client->request_credit = available_credit;
+ client->request_credit += more_credit;
//
// If we have positive credit, process any pending requests
diff --git a/src/router_core/modules/edge_router/edge_mgmt.c b/src/router_core/modules/edge_router/edge_mgmt.c
index 7dd6245..e3f8532 100644
--- a/src/router_core/modules/edge_router/edge_mgmt.c
+++ b/src/router_core/modules/edge_router/edge_mgmt.c
@@ -139,10 +139,7 @@ static void _mgmt_on_state_cb_CT(qdr_core_t *core,
user_context,
(active) ? "active" : "down");
- if (!active) {
- // stop the syncing of link routes by setting credit=0
- qcm_edge_link_route_proxy_flow_CT(core, 0, true);
- }
+ qcm_edge_link_route_proxy_state_CT(core, active);
}
@@ -150,16 +147,16 @@ static void _mgmt_on_state_cb_CT(qdr_core_t *core,
static void _mgmt_on_flow_cb_CT(qdr_core_t *core,
qdrc_client_t *client,
void *user_context,
- int available_credit,
+ int more_credit,
bool drain)
{
qd_log(core->log, QD_LOG_TRACE,
"edge mgmt client flow: uc=%p c=%d d=%s",
- user_context, available_credit,
+ user_context, more_credit,
(drain) ? "T" : "F");
qcm_edge_link_route_proxy_flow_CT(core,
- available_credit,
+ more_credit,
drain);
}
diff --git a/src/router_core/modules/edge_router/link_route_proxy.c b/src/router_core/modules/edge_router/link_route_proxy.c
index 79f51ed..eaf6ea9 100644
--- a/src/router_core/modules/edge_router/link_route_proxy.c
+++ b/src/router_core/modules/edge_router/link_route_proxy.c
@@ -430,11 +430,20 @@ static void _on_addr_event(void *context,
// Public API
//
+// called by edge mgmt API when link(s) detach
+void qcm_edge_link_route_proxy_state_CT(qdr_core_t *core, bool active)
+{
+ if (!active)
+ _available_credit = 0; // stop sending pending syncs
+ else if (_available_credit > 0)
+ _sync_interior_proxies(core);
+}
+
// called by the edge mgmt API when credit has been granted:
void qcm_edge_link_route_proxy_flow_CT(qdr_core_t *core, int available_credit, bool drain)
{
- _available_credit = available_credit;
+ _available_credit += available_credit;
_sync_interior_proxies(core);
if (drain) {
_available_credit = 0;
diff --git a/src/router_core/modules/edge_router/link_route_proxy.h b/src/router_core/modules/edge_router/link_route_proxy.h
index 3f8b0ee..f5307c5 100644
--- a/src/router_core/modules/edge_router/link_route_proxy.h
+++ b/src/router_core/modules/edge_router/link_route_proxy.h
@@ -29,5 +29,5 @@
void qcm_edge_link_route_init_CT(qdr_core_t *core);
void qcm_edge_link_route_final_CT(qdr_core_t *core);
void qcm_edge_link_route_proxy_flow_CT(qdr_core_t *core, int available_credit, bool drain);
-
+void qcm_edge_link_route_proxy_state_CT(qdr_core_t *core, bool active);
#endif
diff --git a/src/router_core/modules/test_hooks/core_test_hooks.c b/src/router_core/modules/test_hooks/core_test_hooks.c
index c3d25a3..ed9cc0d 100644
--- a/src/router_core/modules/test_hooks/core_test_hooks.c
+++ b/src/router_core/modules/test_hooks/core_test_hooks.c
@@ -474,6 +474,8 @@ static void qdrc_test_hooks_core_endpoint_finalize(test_module_t *module)
// tests. Any changes here may require updates to those tests.
//
+static void _do_send(test_client_t *tc);
+
struct test_client_t {
test_module_t *module;
qdrc_event_subscription_t *conn_events;
@@ -511,6 +513,7 @@ static void _client_on_ack_cb(qdr_core_t *core,
request_context, disposition);
assert((int64_t)request_context < tc->counter);
}
+
static void _client_on_done_cb(qdr_core_t *core,
qdrc_client_t *client,
void *user_context,
@@ -519,16 +522,21 @@ static void _client_on_done_cb(qdr_core_t *core,
{
// the system_tests_core_client.py looks for the following
// log message during the tests
+ test_client_t *tc = (test_client_t *)user_context;
qd_log_level_t level = (error) ? QD_LOG_ERROR : QD_LOG_TRACE;
qd_log(core->log, level,
"client test request done error=%s",
(error) ? error : "None");
+ if (!error && tc->credit > 0) {
+ _do_send(tc);
+ }
}
+// send a single request if credit available
static void _do_send(test_client_t *tc)
{
int rc = 0;
- while (tc->credit > 0) {
+ if (tc->credit > 0) {
qd_composed_field_t *props = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, 0);
qd_composed_field_t *body = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0);
@@ -553,7 +561,7 @@ static void _do_send(test_client_t *tc)
++tc->counter;
--tc->credit;
qd_log(tc->module->core->log, QD_LOG_TRACE,
- "client test message sent id=%"PRIi64" c=%d", tc->counter + 1, tc->credit);
+ "client test message sent id=%"PRIi64" c=%d", tc->counter - 1, tc->credit);
}
}
@@ -577,9 +585,12 @@ static void _client_on_flow_cb(qdr_core_t *core, qdrc_client_t *core_client,
qd_log(tc->module->core->log, QD_LOG_TRACE,
"client test on flow c=%d d=%c", available_credit, drain ? 'T' : 'F');
tc->credit = available_credit;
- _do_send(tc);
- if (drain)
- tc->credit = 0;
+ if (drain) {
+ while (tc->credit > 0)
+ _do_send(tc);
+ } else {
+ _do_send(tc);
+ }
}
static void _on_conn_event(void *context, qdrc_event_t type, qdr_connection_t *conn)
@@ -607,7 +618,7 @@ static void _on_conn_event(void *context, qdrc_event_t type, qdr_connection_t *c
tc->core_client = qdrc_client_CT(tc->module->core,
tc->conn,
target,
- 10, // credit window
+ 10, // reply credit window
tc, // user context
_client_on_state_cb,
_client_on_flow_cb);
diff --git a/tests/system_tests_core_client.py b/tests/system_tests_core_client.py
index 83c3338..9030bcd 100644
--- a/tests/system_tests_core_client.py
+++ b/tests/system_tests_core_client.py
@@ -32,11 +32,11 @@ from proton import Endpoint
from proton.handlers import MessagingHandler
from proton.reactor import Container
-# test the request/response core client messaging API These tests rely on
-# enabling the router test hooks, which instantiates a test client (see
-# modules/test_hooks/core_test_hooks)
+# test the request/response core client messaging API
+#
+# These tests rely on enabling the router test hooks, which instantiates a test
+# client (see modules/test_hooks/core_test_hooks) see core_test_hooks.c
-# see core_test_hooks.c
CONTAINER_ID = "org.apache.qpid.dispatch.test_core_client"
TARGET_ADDR = "test_core_client_address"
@@ -53,11 +53,11 @@ class CoreClientAPITest(TestCase):
cls.router = cls.tester.qdrouterd("A", config, cl_args=["-T"])
def test_send_receive(self):
- ts = TestService(self.router.addresses[0], credit=10)
+ ts = TestService(self.router.addresses[0], credit=250)
ts.run()
self.assertTrue(ts.error is None)
- self.assertEqual(10, ts.in_count)
- self.assertEqual(10, ts.out_count)
+ self.assertEqual(250, ts.in_count)
+ self.assertEqual(250, ts.out_count)
def test_credit_starve(self):
ts = TestCreditStarve(self.router.addresses[0])
@@ -72,7 +72,6 @@ class CoreClientAPITest(TestCase):
self.assertTrue(ts.error is None)
self.assertTrue(ts.in_count >= 1)
-
def test_bad_format(self):
ts = TestNoCorrelationId(self.router.addresses[0])
ts.run()
@@ -125,7 +124,6 @@ class TestService(MessagingHandler):
def on_start(self, event):
self.timer = event.reactor.schedule(TIMEOUT, self.Timeout(self))
-
self._conn = event.container.connect(self.address)
def on_link_opening(self, event):
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org