You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2018/12/06 22:45:27 UTC

qpid-dispatch git commit: DISPATCH-1150: add core client request time out

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master 7a9378d58 -> 7ec199d46


DISPATCH-1150: add core client request time out


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/7ec199d4
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/7ec199d4
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/7ec199d4

Branch: refs/heads/master
Commit: 7ec199d46ea6cb268c27977ccd0a00102da3b371
Parents: 7a9378d
Author: Kenneth Giusti <kg...@apache.org>
Authored: Thu Dec 6 13:22:35 2018 -0500
Committer: Kenneth Giusti <kg...@apache.org>
Committed: Thu Dec 6 17:21:43 2018 -0500

----------------------------------------------------------------------
 src/router_core/core_client_api.c               | 28 ++++++++++++---
 src/router_core/core_client_api.h               |  7 ++--
 .../address_lookup_client/lookup_client.c       |  3 +-
 src/router_core/modules/edge_router/edge_mgmt.c |  2 ++
 src/router_core/modules/edge_router/edge_mgmt.h |  4 +++
 .../modules/edge_router/link_route_proxy.c      |  4 ++-
 .../modules/test_hooks/core_test_hooks.c        |  9 +++--
 tests/system_test.py                            |  6 ++++
 tests/system_tests_core_client.py               | 37 ++++++++++++++++++++
 9 files changed, 88 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7ec199d4/src/router_core/core_client_api.c
----------------------------------------------------------------------
diff --git a/src/router_core/core_client_api.c b/src/router_core/core_client_api.c
index 731cd77..4967aec 100644
--- a/src/router_core/core_client_api.c
+++ b/src/router_core/core_client_api.c
@@ -41,6 +41,7 @@ struct qdrc_client_request_t {
     qd_iterator_t       *correlation_key;
     qd_hash_handle_t    *hash_handle;
     qdr_delivery_t      *delivery;
+    qdr_core_timer_t    *timer;
 
     qd_composed_field_t *app_properties;
     qd_composed_field_t *body;
@@ -119,6 +120,7 @@ static void _free_request_CT(qdrc_client_t *client,
                              const char *error);
 static qd_message_t *_create_message_CT(qdrc_client_t *client,
                                         qdrc_client_request_t *req);
+static void _timer_expired(qdr_core_t *core, void *context);
 
 
 static qdrc_endpoint_desc_t sender_endpoint = {
@@ -229,12 +231,13 @@ int qdrc_client_request_CT(qdrc_client_t                 *client,
                            void                          *request_context,
                            qd_composed_field_t           *app_properties,
                            qd_composed_field_t           *body,
+                           uint32_t                       timeout,
                            qdrc_client_on_reply_CT_t      on_reply_cb,
                            qdrc_client_on_ack_CT_t        on_ack_cb,
                            qdrc_client_request_done_CT_t  done_cb)
 {
     qd_log(client->core->log, QD_LOG_TRACE,
-           "New core client request created c=%p, rc=%"PRIuPTR")",
+           "New core client request created c=%p, rc=%"PRIuPTR,
            client, request_context);
 
     qdrc_client_request_t *req = new_qdrc_client_request_t();
@@ -246,6 +249,10 @@ int qdrc_client_request_CT(qdrc_client_t                 *client,
     req->on_reply_cb    = on_reply_cb;
     req->on_ack_cb      = on_ack_cb;
     req->done_cb        = done_cb;
+    if (timeout) {
+        req->timer = qdr_core_timer_CT(client->core, _timer_expired, req);
+        qdr_core_timer_schedule_CT(client->core, req->timer, timeout);
+    }
 
     _send_request_CT(client, req);
     return 0;
@@ -287,7 +294,7 @@ static void _flush_send_queue_CT(qdrc_client_t *client)
         req->on_send_queue = false;
 
         qd_log(client->core->log, QD_LOG_TRACE,
-               "Core client request sent c=%p, rc=%"PRIuPTR" dlv=%p cid=%s)",
+               "Core client request sent c=%p, rc=%"PRIuPTR" dlv=%p cid=%s",
                client, req->req_context, req->delivery,
                *req->correlation_id ? req->correlation_id : "<none>");
 
@@ -313,6 +320,9 @@ static void _free_request_CT(qdrc_client_t *client,
                              qdrc_client_request_t *req,
                              const char *error)
 {
+    if (req->timer) {
+        qdr_core_timer_free_CT(client->core, req->timer);
+    }
     if (req->on_send_queue)
         DEQ_REMOVE_N(SEND_Q, client->send_queue, req);
     if (req->on_unsettled_list)
@@ -347,8 +357,9 @@ static void _free_request_CT(qdrc_client_t *client,
     }
 
     qd_log(client->core->log, QD_LOG_TRACE,
-           "Freeing core client request c=%p, rc=%"PRIuPTR")",
-           client, req->req_context);
+           "Freeing core client request c=%p, rc=%"PRIuPTR" (%s)",
+           client, req->req_context,
+           error ? error : "request complete");
 
     free_qdrc_client_request_t(req);
 }
@@ -683,3 +694,12 @@ static qd_message_t *_create_message_CT(qdrc_client_t *client,
 
     return message;
 }
+
+
+// a request has timed out
+static void _timer_expired(qdr_core_t *core, void *context)
+{
+    qdrc_client_request_t *req = (qdrc_client_request_t *)context;
+    qdrc_client_t *client = req->client;
+    _free_request_CT(client, req, "Timed out");
+}

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7ec199d4/src/router_core/core_client_api.h
----------------------------------------------------------------------
diff --git a/src/router_core/core_client_api.h b/src/router_core/core_client_api.h
index ff78e72..1d2da4c 100644
--- a/src/router_core/core_client_api.h
+++ b/src/router_core/core_client_api.h
@@ -141,14 +141,14 @@ void qdrc_client_free_CT(qdrc_client_t *client);
  * @param client - as returned by qdrc_client_CT()
  * @param request_context - context for this request that will be passed to
  *        callbacks
-
  * @param app_properties - the application properties for the sent message.
  *        Ownership is transferred to this call - the caller must not reference the
  *        composed field on return.
-
  * @param body - the message body for the sent message. Ownership is transferred
  *        to this call - the caller must not reference the composed field on return.
-
+ * @param timeout - abort the request if it does not complete in timeout
+ *                  seconds. On timeout done_cb will be called with error
+ *                  set to "Timed out" (timeout=0 means never timeout).
  * @param on_reply_cb - (optional) invoked when reply message arrives
  * @param on_ack_cb - (optional) invoked when sent message disposition is set
  * @param done_cb - (optional) called once request is done (for cleanup)
@@ -158,6 +158,7 @@ int qdrc_client_request_CT(qdrc_client_t                 *client,
                            void                          *request_context,
                            qd_composed_field_t           *app_properties,
                            qd_composed_field_t           *body,
+                           uint32_t                       timeout,
                            qdrc_client_on_reply_CT_t      on_reply_cb,
                            qdrc_client_on_ack_CT_t        on_ack_cb,
                            qdrc_client_request_done_CT_t  done_cb);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7ec199d4/src/router_core/modules/address_lookup_client/lookup_client.c
----------------------------------------------------------------------
diff --git a/src/router_core/modules/address_lookup_client/lookup_client.c b/src/router_core/modules/address_lookup_client/lookup_client.c
index 9786f6e..ea2449e 100644
--- a/src/router_core/modules/address_lookup_client/lookup_client.c
+++ b/src/router_core/modules/address_lookup_client/lookup_client.c
@@ -412,6 +412,7 @@ static void qcm_addr_lookup_local_search(qcm_lookup_client_t *client, qcm_addr_l
 
 static void qcm_addr_lookup_process_pending_requests_CT(qcm_lookup_client_t *client)
 {
+    const uint32_t timeout = 3;
     int result;
 
     while (client->request_credit > 0 && DEQ_SIZE(client->pending_requests) > 0) {
@@ -426,7 +427,7 @@ static void qcm_addr_lookup_process_pending_requests_CT(qcm_lookup_client_t *cli
             if (iter) {
                 result = qcm_link_route_lookup_request(iter, request->dir, &props, &body);
                 if (result == 0) {
-                    result = qdrc_client_request_CT(client->client_api, request, props, body, on_reply, 0, on_request_done);
+                    result = qdrc_client_request_CT(client->client_api, request, props, body, timeout, on_reply, 0, on_request_done);
                     if (result == 0) {
                         DEQ_INSERT_TAIL(client->sent_requests, request);
                         client->request_credit--;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7ec199d4/src/router_core/modules/edge_router/edge_mgmt.c
----------------------------------------------------------------------
diff --git a/src/router_core/modules/edge_router/edge_mgmt.c b/src/router_core/modules/edge_router/edge_mgmt.c
index 1322003..1fc929a 100644
--- a/src/router_core/modules/edge_router/edge_mgmt.c
+++ b/src/router_core/modules/edge_router/edge_mgmt.c
@@ -251,6 +251,7 @@ int qcm_edge_mgmt_request_CT(qdr_core_t           *core,
                              const char           *identity,
                              const char           *name,
                              qd_composed_field_t  *body,
+                             uint32_t                 timeout,
                              qcm_edge_mgmt_reply_CT_t reply_cb,
                              qcm_edge_mgmt_error_CT_t error_cb)
 {
@@ -291,6 +292,7 @@ int qcm_edge_mgmt_request_CT(qdr_core_t           *core,
                                   req,   // request context
                                   ap_fld,
                                   body,
+                                  timeout,
                                   _mgmt_on_reply_cb_CT,
                                   _mgmt_on_ack_cb_CT,
                                   _mgmt_on_done_cb_CT);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7ec199d4/src/router_core/modules/edge_router/edge_mgmt.h
----------------------------------------------------------------------
diff --git a/src/router_core/modules/edge_router/edge_mgmt.h b/src/router_core/modules/edge_router/edge_mgmt.h
index 06ee006..b4e0967 100644
--- a/src/router_core/modules/edge_router/edge_mgmt.h
+++ b/src/router_core/modules/edge_router/edge_mgmt.h
@@ -78,6 +78,9 @@ typedef void (*qcm_edge_mgmt_error_CT_t)(qdr_core_t *core,
  * @param name - (optional) the entity's name
  * @param body - message body content.  Ownership is transferred - the caller
  *               must not reference this on return.
+ * @param timeout - time in seconds to wait for request to complete.  If this
+ *                  timer expires the error_cb will be invoked with the
+ *                  error "Timed out"
  * @param reply_cb - Callback for reply message.
  * @param error_cb - Callback if error occurs
  * @return zero on success, else error. On success a callback is
@@ -90,6 +93,7 @@ int qcm_edge_mgmt_request_CT(qdr_core_t *core,
                              const char *identity,
                              const char *name,
                              qd_composed_field_t *body,
+                             uint32_t                 timeout,
                              qcm_edge_mgmt_reply_CT_t reply_cb,
                              qcm_edge_mgmt_error_CT_t error_cb);
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7ec199d4/src/router_core/modules/edge_router/link_route_proxy.c
----------------------------------------------------------------------
diff --git a/src/router_core/modules/edge_router/link_route_proxy.c b/src/router_core/modules/edge_router/link_route_proxy.c
index 5a761f1..a5d6172 100644
--- a/src/router_core/modules/edge_router/link_route_proxy.c
+++ b/src/router_core/modules/edge_router/link_route_proxy.c
@@ -127,9 +127,10 @@ static void _sync_interior_proxies(qdr_core_t *core)
                                      lrp, // context
                                      "CREATE",
                                      CONN_LINK_ROUTE_TYPE,
-                                     0,
+                                     0,  // id
                                      lrp->proxy_name,
                                      _create_body(lrp),
+                                     10,  // timeout
                                      _on_create_reply_CT,
                                      _on_create_error_CT);
             _available_credit -= 1;
@@ -154,6 +155,7 @@ static void _sync_interior_proxies(qdr_core_t *core)
                                      lrp->proxy_id,
                                      lrp->proxy_name,
                                      body,
+                                     10,  // timeout
                                      _on_delete_reply_CT,
                                      _on_delete_error_CT);
             _available_credit -= 1;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7ec199d4/src/router_core/modules/test_hooks/core_test_hooks.c
----------------------------------------------------------------------
diff --git a/src/router_core/modules/test_hooks/core_test_hooks.c b/src/router_core/modules/test_hooks/core_test_hooks.c
index 32a1ac8..ef09ee4 100644
--- a/src/router_core/modules/test_hooks/core_test_hooks.c
+++ b/src/router_core/modules/test_hooks/core_test_hooks.c
@@ -517,9 +517,11 @@ static void _client_on_done_cb(qdr_core_t    *core,
                                void          *request_context,
                                const char    *error)
 {
-    qd_log(core->log, QD_LOG_TRACE,
-           "client test request done rc=%"PRIxPTR" error=%s",
-           request_context,
+    // the system_tests_core_client.py looks for the following
+    // log message during the tests
+    qd_log_level_t level = (error) ? QD_LOG_ERROR : QD_LOG_TRACE;
+    qd_log(core->log, level,
+           "client test request done error=%s",
            (error) ? error : "None");
 }
 
@@ -543,6 +545,7 @@ static void _do_send(test_client_t *tc)
                                (void *)tc->counter,  // request context
                                props,
                                body,
+                               5,  // timeout
                                _client_on_reply_cb,
                                _client_on_ack_cb,
                                _client_on_done_cb);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7ec199d4/tests/system_test.py
----------------------------------------------------------------------
diff --git a/tests/system_test.py b/tests/system_test.py
index d4cd362..ef15959 100755
--- a/tests/system_test.py
+++ b/tests/system_test.py
@@ -867,6 +867,12 @@ class QdManager(object):
     def query(self, long_type):
         return json.loads(self('QUERY --type=%s' % long_type))
 
+    def get_log(self, limit=None):
+        cmd = 'GET-LOG'
+        if (limit):
+            cmd += " limit=%s" % limit
+        return json.loads(self(cmd))
+
 
 class MgmtMsgProxy(object):
     """

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7ec199d4/tests/system_tests_core_client.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_core_client.py b/tests/system_tests_core_client.py
index 3898aa1..1c0d0c9 100644
--- a/tests/system_tests_core_client.py
+++ b/tests/system_tests_core_client.py
@@ -24,6 +24,7 @@ from __future__ import print_function
 
 from system_test import TestCase
 from system_test import Qdrouterd
+from system_test import QdManager
 from system_test import TIMEOUT
 
 from proton import Message
@@ -84,6 +85,12 @@ class CoreClientAPITest(TestCase):
         self.assertTrue(ts.error is None)
         self.assertTrue(ts.accepted)
 
+    def test_call_timeout(self):
+        qm = QdManager(self, self.router.addresses[0])
+        ts = TestCallTimeout(self.router.addresses[0], qm)
+        ts.run()
+        self.assertEqual("TIMED OUT!", ts.error)
+
 
 class TestService(MessagingHandler):
     # a service that the core client can communicate with
@@ -218,3 +225,33 @@ class TestOldCorrelationId(TestService):
 
     def on_accepted(self, event):
         self.accepted = True
+
+
+class TestCallTimeout(TestService):
+    # test that the timeout is handled properly
+
+    class PeriodicLogScrape(object):
+        # periodically scan the log for the timeout error
+        def __init__(self, service):
+            self.service = service
+
+        def on_timer_task(self, event):
+            log = self.service.qm.get_log()
+            for e in log:
+                if (e[0] == 'ROUTER_CORE'
+                    and e[1] == 'error'
+                    # yes this is the line you're looking for:
+                    and e[2] == 'client test request done error=Timed out'):
+                    self.service.error = "TIMED OUT!"
+                    if self.service._conn:
+                        self.service._conn.close()
+                    return
+            event.reactor.schedule(1, TestCallTimeout.PeriodicLogScrape(self.service))
+
+    def __init__(self, address, qm):
+        super(TestCallTimeout, self).__init__(address, credit=1)
+        self.qm = qm
+
+    def on_message(self, event):
+        # drop it
+        event.reactor.schedule(1, self.PeriodicLogScrape(self))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org