You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2018/12/06 22:45:27 UTC
qpid-dispatch git commit: DISPATCH-1150: add core client request time
out
Repository: qpid-dispatch
Updated Branches:
refs/heads/master 7a9378d58 -> 7ec199d46
DISPATCH-1150: add core client request time out
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/7ec199d4
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/7ec199d4
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/7ec199d4
Branch: refs/heads/master
Commit: 7ec199d46ea6cb268c27977ccd0a00102da3b371
Parents: 7a9378d
Author: Kenneth Giusti <kg...@apache.org>
Authored: Thu Dec 6 13:22:35 2018 -0500
Committer: Kenneth Giusti <kg...@apache.org>
Committed: Thu Dec 6 17:21:43 2018 -0500
----------------------------------------------------------------------
src/router_core/core_client_api.c | 28 ++++++++++++---
src/router_core/core_client_api.h | 7 ++--
.../address_lookup_client/lookup_client.c | 3 +-
src/router_core/modules/edge_router/edge_mgmt.c | 2 ++
src/router_core/modules/edge_router/edge_mgmt.h | 4 +++
.../modules/edge_router/link_route_proxy.c | 4 ++-
.../modules/test_hooks/core_test_hooks.c | 9 +++--
tests/system_test.py | 6 ++++
tests/system_tests_core_client.py | 37 ++++++++++++++++++++
9 files changed, 88 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7ec199d4/src/router_core/core_client_api.c
----------------------------------------------------------------------
diff --git a/src/router_core/core_client_api.c b/src/router_core/core_client_api.c
index 731cd77..4967aec 100644
--- a/src/router_core/core_client_api.c
+++ b/src/router_core/core_client_api.c
@@ -41,6 +41,7 @@ struct qdrc_client_request_t {
qd_iterator_t *correlation_key;
qd_hash_handle_t *hash_handle;
qdr_delivery_t *delivery;
+ qdr_core_timer_t *timer;
qd_composed_field_t *app_properties;
qd_composed_field_t *body;
@@ -119,6 +120,7 @@ static void _free_request_CT(qdrc_client_t *client,
const char *error);
static qd_message_t *_create_message_CT(qdrc_client_t *client,
qdrc_client_request_t *req);
+static void _timer_expired(qdr_core_t *core, void *context);
static qdrc_endpoint_desc_t sender_endpoint = {
@@ -229,12 +231,13 @@ int qdrc_client_request_CT(qdrc_client_t *client,
void *request_context,
qd_composed_field_t *app_properties,
qd_composed_field_t *body,
+ uint32_t timeout,
qdrc_client_on_reply_CT_t on_reply_cb,
qdrc_client_on_ack_CT_t on_ack_cb,
qdrc_client_request_done_CT_t done_cb)
{
qd_log(client->core->log, QD_LOG_TRACE,
- "New core client request created c=%p, rc=%"PRIuPTR")",
+ "New core client request created c=%p, rc=%"PRIuPTR,
client, request_context);
qdrc_client_request_t *req = new_qdrc_client_request_t();
@@ -246,6 +249,10 @@ int qdrc_client_request_CT(qdrc_client_t *client,
req->on_reply_cb = on_reply_cb;
req->on_ack_cb = on_ack_cb;
req->done_cb = done_cb;
+ if (timeout) {
+ req->timer = qdr_core_timer_CT(client->core, _timer_expired, req);
+ qdr_core_timer_schedule_CT(client->core, req->timer, timeout);
+ }
_send_request_CT(client, req);
return 0;
@@ -287,7 +294,7 @@ static void _flush_send_queue_CT(qdrc_client_t *client)
req->on_send_queue = false;
qd_log(client->core->log, QD_LOG_TRACE,
- "Core client request sent c=%p, rc=%"PRIuPTR" dlv=%p cid=%s)",
+ "Core client request sent c=%p, rc=%"PRIuPTR" dlv=%p cid=%s",
client, req->req_context, req->delivery,
*req->correlation_id ? req->correlation_id : "<none>");
@@ -313,6 +320,9 @@ static void _free_request_CT(qdrc_client_t *client,
qdrc_client_request_t *req,
const char *error)
{
+ if (req->timer) {
+ qdr_core_timer_free_CT(client->core, req->timer);
+ }
if (req->on_send_queue)
DEQ_REMOVE_N(SEND_Q, client->send_queue, req);
if (req->on_unsettled_list)
@@ -347,8 +357,9 @@ static void _free_request_CT(qdrc_client_t *client,
}
qd_log(client->core->log, QD_LOG_TRACE,
- "Freeing core client request c=%p, rc=%"PRIuPTR")",
- client, req->req_context);
+ "Freeing core client request c=%p, rc=%"PRIuPTR" (%s)",
+ client, req->req_context,
+ error ? error : "request complete");
free_qdrc_client_request_t(req);
}
@@ -683,3 +694,12 @@ static qd_message_t *_create_message_CT(qdrc_client_t *client,
return message;
}
+
+
+// a request has timed out
+static void _timer_expired(qdr_core_t *core, void *context)
+{
+ qdrc_client_request_t *req = (qdrc_client_request_t *)context;
+ qdrc_client_t *client = req->client;
+ _free_request_CT(client, req, "Timed out");
+}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7ec199d4/src/router_core/core_client_api.h
----------------------------------------------------------------------
diff --git a/src/router_core/core_client_api.h b/src/router_core/core_client_api.h
index ff78e72..1d2da4c 100644
--- a/src/router_core/core_client_api.h
+++ b/src/router_core/core_client_api.h
@@ -141,14 +141,14 @@ void qdrc_client_free_CT(qdrc_client_t *client);
* @param client - as returned by qdrc_client_CT()
* @param request_context - context for this request that will be passed to
* callbacks
-
* @param app_properties - the application properties for the sent message.
* Ownership is transferred to this call - the caller must not reference the
* composed field on return.
-
* @param body - the message body for the sent message. Ownership is transferred
* to this call - the caller must not reference the composed field on return.
-
+ * @param timeout - abort the request if it does not complete in timeout
+ * seconds. On timeout done_cb will be called with error
+ * set to "Timed out" (timeout=0 means never timeout).
* @param on_reply_cb - (optional) invoked when reply message arrives
* @param on_ack_cb - (optional) invoked when sent message disposition is set
* @param done_cb - (optional) called once request is done (for cleanup)
@@ -158,6 +158,7 @@ int qdrc_client_request_CT(qdrc_client_t *client,
void *request_context,
qd_composed_field_t *app_properties,
qd_composed_field_t *body,
+ uint32_t timeout,
qdrc_client_on_reply_CT_t on_reply_cb,
qdrc_client_on_ack_CT_t on_ack_cb,
qdrc_client_request_done_CT_t done_cb);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7ec199d4/src/router_core/modules/address_lookup_client/lookup_client.c
----------------------------------------------------------------------
diff --git a/src/router_core/modules/address_lookup_client/lookup_client.c b/src/router_core/modules/address_lookup_client/lookup_client.c
index 9786f6e..ea2449e 100644
--- a/src/router_core/modules/address_lookup_client/lookup_client.c
+++ b/src/router_core/modules/address_lookup_client/lookup_client.c
@@ -412,6 +412,7 @@ static void qcm_addr_lookup_local_search(qcm_lookup_client_t *client, qcm_addr_l
static void qcm_addr_lookup_process_pending_requests_CT(qcm_lookup_client_t *client)
{
+ const uint32_t timeout = 3;
int result;
while (client->request_credit > 0 && DEQ_SIZE(client->pending_requests) > 0) {
@@ -426,7 +427,7 @@ static void qcm_addr_lookup_process_pending_requests_CT(qcm_lookup_client_t *cli
if (iter) {
result = qcm_link_route_lookup_request(iter, request->dir, &props, &body);
if (result == 0) {
- result = qdrc_client_request_CT(client->client_api, request, props, body, on_reply, 0, on_request_done);
+ result = qdrc_client_request_CT(client->client_api, request, props, body, timeout, on_reply, 0, on_request_done);
if (result == 0) {
DEQ_INSERT_TAIL(client->sent_requests, request);
client->request_credit--;
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7ec199d4/src/router_core/modules/edge_router/edge_mgmt.c
----------------------------------------------------------------------
diff --git a/src/router_core/modules/edge_router/edge_mgmt.c b/src/router_core/modules/edge_router/edge_mgmt.c
index 1322003..1fc929a 100644
--- a/src/router_core/modules/edge_router/edge_mgmt.c
+++ b/src/router_core/modules/edge_router/edge_mgmt.c
@@ -251,6 +251,7 @@ int qcm_edge_mgmt_request_CT(qdr_core_t *core,
const char *identity,
const char *name,
qd_composed_field_t *body,
+ uint32_t timeout,
qcm_edge_mgmt_reply_CT_t reply_cb,
qcm_edge_mgmt_error_CT_t error_cb)
{
@@ -291,6 +292,7 @@ int qcm_edge_mgmt_request_CT(qdr_core_t *core,
req, // request context
ap_fld,
body,
+ timeout,
_mgmt_on_reply_cb_CT,
_mgmt_on_ack_cb_CT,
_mgmt_on_done_cb_CT);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7ec199d4/src/router_core/modules/edge_router/edge_mgmt.h
----------------------------------------------------------------------
diff --git a/src/router_core/modules/edge_router/edge_mgmt.h b/src/router_core/modules/edge_router/edge_mgmt.h
index 06ee006..b4e0967 100644
--- a/src/router_core/modules/edge_router/edge_mgmt.h
+++ b/src/router_core/modules/edge_router/edge_mgmt.h
@@ -78,6 +78,9 @@ typedef void (*qcm_edge_mgmt_error_CT_t)(qdr_core_t *core,
* @param name - (optional) the entity's name
* @param body - message body content. Ownership is transferred - the caller
* must not reference this on return.
+ * @param timeout - time in seconds to wait for request to complete. If this
+ * timer expires the error_cb will be invoked with the
+ * error "Timed out"
* @param reply_cb - Callback for reply message.
* @param error_cb - Callback if error occurs
* @return zero on success, else error. On success a callback is
@@ -90,6 +93,7 @@ int qcm_edge_mgmt_request_CT(qdr_core_t *core,
const char *identity,
const char *name,
qd_composed_field_t *body,
+ uint32_t timeout,
qcm_edge_mgmt_reply_CT_t reply_cb,
qcm_edge_mgmt_error_CT_t error_cb);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7ec199d4/src/router_core/modules/edge_router/link_route_proxy.c
----------------------------------------------------------------------
diff --git a/src/router_core/modules/edge_router/link_route_proxy.c b/src/router_core/modules/edge_router/link_route_proxy.c
index 5a761f1..a5d6172 100644
--- a/src/router_core/modules/edge_router/link_route_proxy.c
+++ b/src/router_core/modules/edge_router/link_route_proxy.c
@@ -127,9 +127,10 @@ static void _sync_interior_proxies(qdr_core_t *core)
lrp, // context
"CREATE",
CONN_LINK_ROUTE_TYPE,
- 0,
+ 0, // id
lrp->proxy_name,
_create_body(lrp),
+ 10, // timeout
_on_create_reply_CT,
_on_create_error_CT);
_available_credit -= 1;
@@ -154,6 +155,7 @@ static void _sync_interior_proxies(qdr_core_t *core)
lrp->proxy_id,
lrp->proxy_name,
body,
+ 10, // timeout
_on_delete_reply_CT,
_on_delete_error_CT);
_available_credit -= 1;
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7ec199d4/src/router_core/modules/test_hooks/core_test_hooks.c
----------------------------------------------------------------------
diff --git a/src/router_core/modules/test_hooks/core_test_hooks.c b/src/router_core/modules/test_hooks/core_test_hooks.c
index 32a1ac8..ef09ee4 100644
--- a/src/router_core/modules/test_hooks/core_test_hooks.c
+++ b/src/router_core/modules/test_hooks/core_test_hooks.c
@@ -517,9 +517,11 @@ static void _client_on_done_cb(qdr_core_t *core,
void *request_context,
const char *error)
{
- qd_log(core->log, QD_LOG_TRACE,
- "client test request done rc=%"PRIxPTR" error=%s",
- request_context,
+ // the system_tests_core_client.py looks for the following
+ // log message during the tests
+ qd_log_level_t level = (error) ? QD_LOG_ERROR : QD_LOG_TRACE;
+ qd_log(core->log, level,
+ "client test request done error=%s",
(error) ? error : "None");
}
@@ -543,6 +545,7 @@ static void _do_send(test_client_t *tc)
(void *)tc->counter, // request context
props,
body,
+ 5, // timeout
_client_on_reply_cb,
_client_on_ack_cb,
_client_on_done_cb);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7ec199d4/tests/system_test.py
----------------------------------------------------------------------
diff --git a/tests/system_test.py b/tests/system_test.py
index d4cd362..ef15959 100755
--- a/tests/system_test.py
+++ b/tests/system_test.py
@@ -867,6 +867,12 @@ class QdManager(object):
def query(self, long_type):
return json.loads(self('QUERY --type=%s' % long_type))
+ def get_log(self, limit=None):
+ cmd = 'GET-LOG'
+ if (limit):
+ cmd += " limit=%s" % limit
+ return json.loads(self(cmd))
+
class MgmtMsgProxy(object):
"""
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/7ec199d4/tests/system_tests_core_client.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_core_client.py b/tests/system_tests_core_client.py
index 3898aa1..1c0d0c9 100644
--- a/tests/system_tests_core_client.py
+++ b/tests/system_tests_core_client.py
@@ -24,6 +24,7 @@ from __future__ import print_function
from system_test import TestCase
from system_test import Qdrouterd
+from system_test import QdManager
from system_test import TIMEOUT
from proton import Message
@@ -84,6 +85,12 @@ class CoreClientAPITest(TestCase):
self.assertTrue(ts.error is None)
self.assertTrue(ts.accepted)
+ def test_call_timeout(self):
+ qm = QdManager(self, self.router.addresses[0])
+ ts = TestCallTimeout(self.router.addresses[0], qm)
+ ts.run()
+ self.assertEqual("TIMED OUT!", ts.error)
+
class TestService(MessagingHandler):
# a service that the core client can communicate with
@@ -218,3 +225,33 @@ class TestOldCorrelationId(TestService):
def on_accepted(self, event):
self.accepted = True
+
+
+class TestCallTimeout(TestService):
+ # test that the timeout is handled properly
+
+ class PeriodicLogScrape(object):
+ # periodically scan the log for the timeout error
+ def __init__(self, service):
+ self.service = service
+
+ def on_timer_task(self, event):
+ log = self.service.qm.get_log()
+ for e in log:
+ if (e[0] == 'ROUTER_CORE'
+ and e[1] == 'error'
+ # yes this is the line you're looking for:
+ and e[2] == 'client test request done error=Timed out'):
+ self.service.error = "TIMED OUT!"
+ if self.service._conn:
+ self.service._conn.close()
+ return
+ event.reactor.schedule(1, TestCallTimeout.PeriodicLogScrape(self.service))
+
+ def __init__(self, address, qm):
+ super(TestCallTimeout, self).__init__(address, credit=1)
+ self.qm = qm
+
+ def on_message(self, event):
+ # drop it
+ event.reactor.schedule(1, self.PeriodicLogScrape(self))
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org