You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gm...@apache.org on 2019/01/02 19:20:39 UTC

[qpid-dispatch] branch master updated: DISPATCH-1231: correct credit handling in core client. This closes #431

This is an automated email from the ASF dual-hosted git repository.

gmurthy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git


The following commit(s) were added to refs/heads/master by this push:
     new e54e0f4  DISPATCH-1231: correct credit handling in core client. This closes #431
e54e0f4 is described below

commit e54e0f46a45e9dd825cbe2b1eaf60bee226b1bd4
Author: Kenneth Giusti <kg...@apache.org>
AuthorDate: Sun Dec 23 19:55:48 2018 -0500

    DISPATCH-1231: correct credit handling in core client. This closes #431
---
 src/router_core/core_client_api.c                  |  6 +++---
 .../modules/address_lookup_client/lookup_client.c  |  4 ++--
 src/router_core/modules/edge_router/edge_mgmt.c    | 11 ++++-------
 .../modules/edge_router/link_route_proxy.c         | 11 ++++++++++-
 .../modules/edge_router/link_route_proxy.h         |  2 +-
 .../modules/test_hooks/core_test_hooks.c           | 23 ++++++++++++++++------
 tests/system_tests_core_client.py                  | 16 +++++++--------
 7 files changed, 44 insertions(+), 29 deletions(-)

diff --git a/src/router_core/core_client_api.c b/src/router_core/core_client_api.c
index 7483869..479a45f 100644
--- a/src/router_core/core_client_api.c
+++ b/src/router_core/core_client_api.c
@@ -435,11 +435,11 @@ static void _sender_flow_CT(void *context,
     qdrc_client_t *client = (qdrc_client_t *)context;
     qdr_core_t *core = client->core;
 
+    client->tx_credit += available_credit;
     qd_log(core->log, QD_LOG_TRACE,
            "Core client sender flow granted c=%p credit=%d d=%s",
-           client, available_credit, (drain) ? "T" : "F");
-    client->tx_credit = available_credit;
-    if (available_credit > 0) {
+           client, client->tx_credit, (drain) ? "T" : "F");
+    if (client->tx_credit > 0) {
         _flush_send_queue_CT(client);
     }
 
diff --git a/src/router_core/modules/address_lookup_client/lookup_client.c b/src/router_core/modules/address_lookup_client/lookup_client.c
index 048a399..197bd34 100644
--- a/src/router_core/modules/address_lookup_client/lookup_client.c
+++ b/src/router_core/modules/address_lookup_client/lookup_client.c
@@ -575,12 +575,12 @@ static void on_state(qdr_core_t    *core,
 static void on_flow(qdr_core_t    *core,
                     qdrc_client_t *api_client,
                     void          *user_context,
-                    int            available_credit,
+                    int            more_credit,
                     bool           drain)
 {
     qcm_lookup_client_t *client = (qcm_lookup_client_t*) user_context;
 
-    client->request_credit = available_credit;
+    client->request_credit += more_credit;
 
     //
     // If we have positive credit, process any pending requests
diff --git a/src/router_core/modules/edge_router/edge_mgmt.c b/src/router_core/modules/edge_router/edge_mgmt.c
index 7dd6245..e3f8532 100644
--- a/src/router_core/modules/edge_router/edge_mgmt.c
+++ b/src/router_core/modules/edge_router/edge_mgmt.c
@@ -139,10 +139,7 @@ static void _mgmt_on_state_cb_CT(qdr_core_t    *core,
            user_context,
            (active) ? "active" : "down");
 
-    if (!active) {
-        // stop the syncing of link routes by setting credit=0
-        qcm_edge_link_route_proxy_flow_CT(core, 0, true);
-    }
+    qcm_edge_link_route_proxy_state_CT(core, active);
 }
 
 
@@ -150,16 +147,16 @@ static void _mgmt_on_state_cb_CT(qdr_core_t    *core,
 static void _mgmt_on_flow_cb_CT(qdr_core_t    *core,
                                 qdrc_client_t *client,
                                 void          *user_context,
-                                int            available_credit,
+                                int            more_credit,
                                 bool           drain)
 {
     qd_log(core->log, QD_LOG_TRACE,
            "edge mgmt client flow: uc=%p c=%d d=%s",
-           user_context, available_credit,
+           user_context, more_credit,
            (drain) ? "T" : "F");
 
     qcm_edge_link_route_proxy_flow_CT(core,
-                                      available_credit,
+                                      more_credit,
                                       drain);
 }
 
diff --git a/src/router_core/modules/edge_router/link_route_proxy.c b/src/router_core/modules/edge_router/link_route_proxy.c
index 79f51ed..eaf6ea9 100644
--- a/src/router_core/modules/edge_router/link_route_proxy.c
+++ b/src/router_core/modules/edge_router/link_route_proxy.c
@@ -430,11 +430,20 @@ static void _on_addr_event(void          *context,
 // Public API
 //
 
+// called by edge mgmt API when link(s) detach
+void qcm_edge_link_route_proxy_state_CT(qdr_core_t *core, bool active)
+{
+    if (!active)
+        _available_credit = 0;  // stop sending pending syncs
+    else if (_available_credit > 0)
+        _sync_interior_proxies(core);
+}
+
 
 // called by the edge mgmt API when credit has been granted:
 void qcm_edge_link_route_proxy_flow_CT(qdr_core_t *core, int available_credit, bool drain)
 {
-    _available_credit = available_credit;
+    _available_credit += available_credit;
     _sync_interior_proxies(core);
     if (drain) {
         _available_credit = 0;
diff --git a/src/router_core/modules/edge_router/link_route_proxy.h b/src/router_core/modules/edge_router/link_route_proxy.h
index 3f8b0ee..f5307c5 100644
--- a/src/router_core/modules/edge_router/link_route_proxy.h
+++ b/src/router_core/modules/edge_router/link_route_proxy.h
@@ -29,5 +29,5 @@
 void qcm_edge_link_route_init_CT(qdr_core_t *core);
 void qcm_edge_link_route_final_CT(qdr_core_t *core);
 void qcm_edge_link_route_proxy_flow_CT(qdr_core_t *core, int available_credit, bool drain);
-
+void qcm_edge_link_route_proxy_state_CT(qdr_core_t *core, bool active);
 #endif
diff --git a/src/router_core/modules/test_hooks/core_test_hooks.c b/src/router_core/modules/test_hooks/core_test_hooks.c
index c3d25a3..ed9cc0d 100644
--- a/src/router_core/modules/test_hooks/core_test_hooks.c
+++ b/src/router_core/modules/test_hooks/core_test_hooks.c
@@ -474,6 +474,8 @@ static void qdrc_test_hooks_core_endpoint_finalize(test_module_t *module)
 // tests.  Any changes here may require updates to those tests.
 //
 
+static void _do_send(test_client_t *tc);
+
 struct test_client_t {
     test_module_t             *module;
     qdrc_event_subscription_t *conn_events;
@@ -511,6 +513,7 @@ static void _client_on_ack_cb(qdr_core_t    *core,
            request_context, disposition);
     assert((int64_t)request_context < tc->counter);
 }
+
 static void _client_on_done_cb(qdr_core_t    *core,
                                qdrc_client_t *client,
                                void          *user_context,
@@ -519,16 +522,21 @@ static void _client_on_done_cb(qdr_core_t    *core,
 {
     // the system_tests_core_client.py looks for the following
     // log message during the tests
+    test_client_t *tc = (test_client_t *)user_context;
     qd_log_level_t level = (error) ? QD_LOG_ERROR : QD_LOG_TRACE;
     qd_log(core->log, level,
            "client test request done error=%s",
            (error) ? error : "None");
+    if (!error && tc->credit > 0) {
+        _do_send(tc);
+    }
 }
 
+// send a single request if credit available
 static void _do_send(test_client_t *tc)
 {
     int rc = 0;
-    while (tc->credit > 0) {
+    if (tc->credit > 0) {
 
         qd_composed_field_t *props = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, 0);
         qd_composed_field_t *body = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0);
@@ -553,7 +561,7 @@ static void _do_send(test_client_t *tc)
         ++tc->counter;
         --tc->credit;
         qd_log(tc->module->core->log, QD_LOG_TRACE,
-               "client test message sent id=%"PRIi64" c=%d", tc->counter + 1, tc->credit);
+               "client test message sent id=%"PRIi64" c=%d", tc->counter - 1, tc->credit);
     }
 }
 
@@ -577,9 +585,12 @@ static void _client_on_flow_cb(qdr_core_t *core, qdrc_client_t *core_client,
     qd_log(tc->module->core->log, QD_LOG_TRACE,
            "client test on flow c=%d d=%c", available_credit, drain ? 'T' : 'F');
     tc->credit = available_credit;
-    _do_send(tc);
-    if (drain)
-        tc->credit = 0;
+    if (drain) {
+        while (tc->credit > 0)
+            _do_send(tc);
+    } else {
+        _do_send(tc);
+    }
 }
 
 static void _on_conn_event(void *context, qdrc_event_t type, qdr_connection_t *conn)
@@ -607,7 +618,7 @@ static void _on_conn_event(void *context, qdrc_event_t type, qdr_connection_t *c
             tc->core_client = qdrc_client_CT(tc->module->core,
                                              tc->conn,
                                              target,
-                                             10,   // credit window
+                                             10,   // reply credit window
                                              tc,   // user context
                                              _client_on_state_cb,
                                              _client_on_flow_cb);
diff --git a/tests/system_tests_core_client.py b/tests/system_tests_core_client.py
index 83c3338..9030bcd 100644
--- a/tests/system_tests_core_client.py
+++ b/tests/system_tests_core_client.py
@@ -32,11 +32,11 @@ from proton import Endpoint
 from proton.handlers import MessagingHandler
 from proton.reactor import Container
 
-# test the request/response core client messaging API These tests rely on
-# enabling the router test hooks, which instantiates a test client (see
-# modules/test_hooks/core_test_hooks)
+# test the request/response core client messaging API
+#
+# These tests rely on enabling the router test hooks, which instantiates a test
+# client (see modules/test_hooks/core_test_hooks) see core_test_hooks.c
 
-# see core_test_hooks.c
 CONTAINER_ID = "org.apache.qpid.dispatch.test_core_client"
 TARGET_ADDR = "test_core_client_address"
 
@@ -53,11 +53,11 @@ class CoreClientAPITest(TestCase):
         cls.router = cls.tester.qdrouterd("A", config, cl_args=["-T"])
 
     def test_send_receive(self):
-        ts = TestService(self.router.addresses[0], credit=10)
+        ts = TestService(self.router.addresses[0], credit=250)
         ts.run()
         self.assertTrue(ts.error is None)
-        self.assertEqual(10, ts.in_count)
-        self.assertEqual(10, ts.out_count)
+        self.assertEqual(250, ts.in_count)
+        self.assertEqual(250, ts.out_count)
 
     def test_credit_starve(self):
         ts = TestCreditStarve(self.router.addresses[0])
@@ -72,7 +72,6 @@ class CoreClientAPITest(TestCase):
         self.assertTrue(ts.error is None)
         self.assertTrue(ts.in_count >= 1)
 
-
     def test_bad_format(self):
         ts = TestNoCorrelationId(self.router.addresses[0])
         ts.run()
@@ -125,7 +124,6 @@ class TestService(MessagingHandler):
 
     def on_start(self, event):
         self.timer = event.reactor.schedule(TIMEOUT, self.Timeout(self))
-
         self._conn = event.container.connect(self.address)
 
     def on_link_opening(self, event):


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org