You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2018/12/06 19:34:01 UTC
qpid-dispatch git commit: DISPATCH-1194 - Added client-side module
for link-route address lookup. WIP.
Repository: qpid-dispatch
Updated Branches:
refs/heads/master 81d2555db -> 2dde7030f
DISPATCH-1194 - Added client-side module for link-route address lookup. WIP.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/2dde7030
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/2dde7030
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/2dde7030
Branch: refs/heads/master
Commit: 2dde7030fb032a54fdefe5b7c5c4a39a4f1bfd6e
Parents: 81d2555
Author: Ted Ross <tr...@redhat.com>
Authored: Thu Dec 6 14:33:34 2018 -0500
Committer: Ted Ross <tr...@redhat.com>
Committed: Thu Dec 6 14:33:34 2018 -0500
----------------------------------------------------------------------
src/address_lookup_utils.c | 8 +-
src/router_core/connections.c | 4 +-
src/router_core/core_attach_address_lookup.h | 4 +-
.../address_lookup_client/lookup_client.c | 319 ++++++++++++++++++-
src/router_core/router_core_private.h | 1 +
src/router_core/terminus.c | 2 +-
6 files changed, 319 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/2dde7030/src/address_lookup_utils.c
----------------------------------------------------------------------
diff --git a/src/address_lookup_utils.c b/src/address_lookup_utils.c
index 4c92921..9935cf5 100644
--- a/src/address_lookup_utils.c
+++ b/src/address_lookup_utils.c
@@ -28,10 +28,10 @@
/* create the message application properties and body for the link route lookup
* request message
*/
-int qcm_link_route_lookup_msg(qd_iterator_t *address,
- qd_direction_t dir,
- qd_composed_field_t **properties,
- qd_composed_field_t **body)
+int qcm_link_route_lookup_request(qd_iterator_t *address,
+ qd_direction_t dir,
+ qd_composed_field_t **properties,
+ qd_composed_field_t **body)
{
*properties = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, 0);
if (!*properties)
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/2dde7030/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index f8e3d51..17feaea 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -1356,7 +1356,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
// This link has a target address
//
if (core->addr_lookup_handler)
- core->addr_lookup_handler(core, conn, link, dir, source, target);
+ core->addr_lookup_handler(core->addr_lookup_context, conn, link, dir, source, target);
else {
qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true);
qdr_terminus_free(source);
@@ -1383,7 +1383,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
switch (link->link_type) {
case QD_LINK_ENDPOINT: {
if (core->addr_lookup_handler)
- core->addr_lookup_handler(core, conn, link, dir, source, target);
+ core->addr_lookup_handler(core->addr_lookup_context, conn, link, dir, source, target);
else {
qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true);
qdr_terminus_free(source);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/2dde7030/src/router_core/core_attach_address_lookup.h
----------------------------------------------------------------------
diff --git a/src/router_core/core_attach_address_lookup.h b/src/router_core/core_attach_address_lookup.h
index d05b9cb..1c5fc50 100644
--- a/src/router_core/core_attach_address_lookup.h
+++ b/src/router_core/core_attach_address_lookup.h
@@ -35,14 +35,14 @@
* message or link routing. This operation may be synchronoue (completed before it
* returns) or asynchronous (completed later).
*
- * @param core Pointer to the core state.
+ * @param context Module context for address-lookup module
* @param conn Pointer to the connection over which the attach arrived.
* @param link Pointer to the attaching link.
* @param dir The direction of message flow for the link.
* @param source The source terminus for the attach.
* @param target The target terminus for the attach.
*/
-typedef void (*qdrc_attach_addr_lookup_t) (qdr_core_t *core,
+typedef void (*qdrc_attach_addr_lookup_t) (void *context,
qdr_connection_t *conn,
qdr_link_t *link,
qd_direction_t dir,
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/2dde7030/src/router_core/modules/address_lookup_client/lookup_client.c
----------------------------------------------------------------------
diff --git a/src/router_core/modules/address_lookup_client/lookup_client.c b/src/router_core/modules/address_lookup_client/lookup_client.c
index 229b00a..2b575db 100644
--- a/src/router_core/modules/address_lookup_client/lookup_client.c
+++ b/src/router_core/modules/address_lookup_client/lookup_client.c
@@ -20,9 +20,53 @@
#include "module.h"
#include "core_attach_address_lookup.h"
#include "router_core_private.h"
+#include "core_events.h"
+#include "core_client_api.h"
+#include <qpid/dispatch/ctools.h>
#include <qpid/dispatch/discriminator.h>
+#include <qpid/dispatch/address_lookup_server.h>
#include <stdio.h>
+static uint64_t on_reply(qdr_core_t *core,
+ qdrc_client_t *api_client,
+ void *user_context,
+ void *request_context,
+ qd_iterator_t *app_properties,
+ qd_iterator_t *body);
+
+static void on_request_done(qdr_core_t *core,
+ qdrc_client_t *api_client,
+ void *user_context,
+ void *request_context,
+ const char *error);
+
+
+typedef struct qcm_addr_lookup_request_t {
+ DEQ_LINKS(struct qcm_addr_lookup_request_t);
+ qdr_connection_t *conn;
+ qdr_link_t *link;
+ qd_direction_t dir;
+ qdr_terminus_t *source;
+ qdr_terminus_t *target;
+} qcm_addr_lookup_request_t;
+
+DEQ_DECLARE(qcm_addr_lookup_request_t, qcm_addr_lookup_request_list_t);
+ALLOC_DECLARE(qcm_addr_lookup_request_t);
+ALLOC_DEFINE(qcm_addr_lookup_request_t);
+
+
+typedef struct qcm_lookup_client_t {
+ qdr_core_t *core;
+ qdrc_event_subscription_t *event_sub;
+ qdr_connection_t *edge_conn;
+ uint32_t request_credit;
+ bool client_api_active;
+ qdrc_client_t *client_api;
+ qcm_addr_lookup_request_list_t pending_requests;
+ qcm_addr_lookup_request_list_t sent_requests;
+} qcm_lookup_client_t;
+
+
static char* disambiguated_link_name(qdr_connection_info_t *conn, char *original)
{
size_t olen = strlen(original);
@@ -338,24 +382,269 @@ static void qdr_link_react_to_first_attach_CT(qdr_core_t *core,
}
-static void qcm_addr_lookup_CT(qdr_core_t *core,
+static void qcm_addr_lookup_local_search(qcm_lookup_client_t *client, qcm_addr_lookup_request_t *request)
+{
+ bool link_route;
+ bool unavailable;
+ bool core_endpoint;
+ qdr_terminus_t *term = request->dir == QD_INCOMING ? request->target : request->source;
+ qdr_address_t *addr = qdr_lookup_terminus_address_CT(client->core,
+ request->dir,
+ request->conn,
+ term,
+ true,
+ true,
+ &link_route,
+ &unavailable,
+ &core_endpoint);
+ qdr_link_react_to_first_attach_CT(client->core,
+ request->conn,
+ addr,
+ request->link,
+ request->dir,
+ request->source,
+ request->target,
+ link_route,
+ unavailable,
+ core_endpoint);
+}
+
+
+static void qcm_addr_lookup_process_pending_requests_CT(qcm_lookup_client_t *client)
+{
+ int result;
+
+ while (client->request_credit > 0 && DEQ_SIZE(client->pending_requests) > 0) {
+ qcm_addr_lookup_request_t *request = DEQ_HEAD(client->pending_requests);
+ DEQ_REMOVE_HEAD(client->pending_requests);
+
+ do {
+ qd_composed_field_t *props;
+ qd_composed_field_t *body;
+ qd_iterator_t *iter = qdr_terminus_get_address(request->dir == QD_INCOMING ? request->target : request->source);
+
+ if (iter) {
+ result = qcm_link_route_lookup_request(iter, request->dir, &props, &body);
+ if (result == 0) {
+ result = qdrc_client_request_CT(client->client_api, request, props, body, on_reply, 0, on_request_done);
+ if (result == 0) {
+ DEQ_INSERT_TAIL(client->sent_requests, request);
+ client->request_credit--;
+ break;
+ }
+
+ //
+ // TODO - set a timer (or use a timeout in the client API)
+ //
+
+ qd_compose_free(props);
+ qd_compose_free(body);
+ }
+ }
+
+ //
+ // If we get here, we failed to launch the asynchronous lookup. Fall back to a local,
+ // synchronous lookup.
+ //
+ qcm_addr_lookup_local_search(client, request);
+ free_qcm_addr_lookup_request_t(request);
+ } while (false);
+ }
+}
+
+
+//================================================================================
+// Address Lookup Handler
+//================================================================================
+
+static void qcm_addr_lookup_CT(void *context,
qdr_connection_t *conn,
qdr_link_t *link,
qd_direction_t dir,
qdr_terminus_t *source,
qdr_terminus_t *target)
+{
+ qcm_lookup_client_t *client = (qcm_lookup_client_t*) context;
+ bool link_route;
+ bool unavailable;
+ bool core_endpoint;
+ qdr_terminus_t *term = dir == QD_INCOMING ? target : source;
+
+ if (client->core->router_mode == QD_ROUTER_MODE_EDGE
+ && client->client_api_active
+ && qdr_terminus_get_address(term) != 0) {
+ //
+ // We are in edge mode, there is an active edge connection, and the terminus has an address.
+ // Set up and scehdule an asynchronous lookup request.
+ //
+ qcm_addr_lookup_request_t *request = new_qcm_addr_lookup_request_t();
+ DEQ_ITEM_INIT(request);
+ request->conn = conn;
+ request->link = link;
+ request->dir = dir;
+ request->source = source;
+ request->target = target;
+
+ DEQ_INSERT_TAIL(client->pending_requests, request);
+ qcm_addr_lookup_process_pending_requests_CT(client);
+ } else {
+ //
+ // If this lookup doesn't meet the criteria for asynchronous action, perform the built-in, synchronous address lookup
+ //
+ qdr_address_t *addr = qdr_lookup_terminus_address_CT(client->core, dir, conn, term, true, true, &link_route, &unavailable, &core_endpoint);
+ qdr_link_react_to_first_attach_CT(client->core, conn, addr, link, dir, source, target, link_route, unavailable, core_endpoint);
+ }
+}
+
+//================================================================================
+// Core Client API Handlers
+//================================================================================
+
+static void on_state(qdr_core_t *core,
+ qdrc_client_t *api_client,
+ void *user_context,
+ bool active)
{
- bool link_route;
- bool unavailable;
- bool core_endpoint;
- qdr_terminus_t *term = dir == QD_INCOMING ? target : source;
+ qcm_lookup_client_t *client = (qcm_lookup_client_t*) user_context;
+
+ client->client_api_active = active;
+ if (!active) {
+ //
+ // Client-API links are down, set our available credit to zero.
+ //
+ client->request_credit = 0;
- qdr_address_t *addr = qdr_lookup_terminus_address_CT(core, dir, conn, term, true, true, &link_route, &unavailable, &core_endpoint);
- qdr_link_react_to_first_attach_CT(core, conn, addr, link, dir, source, target, link_route, unavailable, core_endpoint);
+ //
+ // Locally process all pending requests
+ //
+ qcm_addr_lookup_request_t *request = DEQ_HEAD(client->pending_requests);
+ while (request) {
+ DEQ_REMOVE_HEAD(client->pending_requests);
+ qcm_addr_lookup_local_search(client, request);
+ free_qcm_addr_lookup_request_t(request);
+ request = DEQ_HEAD(client->pending_requests);
+ }
+ }
}
+static void on_flow(qdr_core_t *core,
+ qdrc_client_t *api_client,
+ void *user_context,
+ int available_credit,
+ bool drain)
+{
+ qcm_lookup_client_t *client = (qcm_lookup_client_t*) user_context;
+
+ client->request_credit = available_credit;
+
+ //
+ // If we have positive credit, process any pending requests
+ //
+ if (client->request_credit > 0)
+ qcm_addr_lookup_process_pending_requests_CT(client);
+
+ if (drain)
+ client->request_credit = 0;
+}
+
+
+static uint64_t on_reply(qdr_core_t *core,
+ qdrc_client_t *api_client,
+ void *user_context,
+ void *request_context,
+ qd_iterator_t *app_properties,
+ qd_iterator_t *body)
+{
+ qcm_lookup_client_t *client = (qcm_lookup_client_t*) user_context;
+ qcm_addr_lookup_request_t *request = (qcm_addr_lookup_request_t*) request_context;
+ qcm_address_lookup_status_t status;
+ bool is_link_route;
+ bool has_destinations;
+
+ status = qcm_link_route_lookup_decode(app_properties, body, &is_link_route, &has_destinations);
+ if (status == QCM_ADDR_LOOKUP_OK) {
+ //
+ // TODO - Add resolution using the received data
+ //
+ qcm_addr_lookup_local_search(client, request);
+ } else {
+ qcm_addr_lookup_local_search(client, request);
+ }
+
+ return 0;
+}
+
+
+static void on_request_done(qdr_core_t *core,
+ qdrc_client_t *api_client,
+ void *user_context,
+ void *request_context,
+ const char *error)
+{
+ qcm_lookup_client_t *client = (qcm_lookup_client_t*) user_context;
+ qcm_addr_lookup_request_t *request = (qcm_addr_lookup_request_t*) request_context;
+
+ if (error) {
+ qcm_addr_lookup_local_search(client, request);
+ }
+
+ DEQ_REMOVE(client->sent_requests, request);
+ free_qcm_addr_lookup_request_t(request);
+}
+
+
+//================================================================================
+// Event Handlers
+//================================================================================
+
+static void on_conn_event(void *context,
+ qdrc_event_t event_type,
+ qdr_connection_t *conn)
+{
+ qcm_lookup_client_t *client = (qcm_lookup_client_t*) context;
+
+ switch (event_type) {
+ case QDRC_EVENT_CONN_EDGE_ESTABLISHED:
+ client->edge_conn = conn;
+ client->request_credit = 0;
+
+ //
+ // Set up a Client API session on the edge connection.
+ //
+ qdr_terminus_t *target = qdr_terminus(0);
+ qdr_terminus_set_address(target, QD_TERMINUS_ADDRESS_LOOKUP);
+ client->client_api = qdrc_client_CT(client->core,
+ client->edge_conn,
+ target,
+ 250,
+ client,
+ on_state,
+ on_flow);
+ break;
+
+ case QDRC_EVENT_CONN_EDGE_LOST:
+ client->edge_conn = 0;
+ client->request_credit = 0;
+
+ //
+ // Remove the Client API session.
+ //
+ qdrc_client_free_CT(client->client_api);
+ client->client_api = 0;
+ break;
+
+ default:
+ assert(false);
+ break;
+ }
+}
+
+//================================================================================
+// Module Handlers
+//================================================================================
+
static bool qcm_addr_lookup_client_enable_CT(qdr_core_t *core)
{
return true;
@@ -365,16 +654,26 @@ static bool qcm_addr_lookup_client_enable_CT(qdr_core_t *core)
static void qcm_addr_lookup_client_init_CT(qdr_core_t *core, void **module_context)
{
assert(core->addr_lookup_handler == 0);
+ qcm_lookup_client_t *client = NEW(qcm_lookup_client_t);
+ ZERO(client);
+
+ client->core = core;
+ client->event_sub = qdrc_event_subscribe_CT(client->core,
+ QDRC_EVENT_CONN_EDGE_ESTABLISHED | QDRC_EVENT_CONN_EDGE_LOST,
+ on_conn_event, 0, 0,
+ client);
core->addr_lookup_handler = qcm_addr_lookup_CT;
- *module_context = core;
+ core->addr_lookup_context = client;
+ *module_context = client;
}
static void qcm_addr_lookup_client_final_CT(void *module_context)
{
- qdr_core_t *core = (qdr_core_t*) module_context;
- core->addr_lookup_handler = 0;
+ qcm_lookup_client_t *client = (qcm_lookup_client_t*) module_context;
+ client->core->addr_lookup_handler = 0;
+ free(client);
}
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/2dde7030/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index bb4af57..82f6c5e 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -729,6 +729,7 @@ struct qdr_core_t {
qdr_link_list_t open_links;
qdrc_attach_addr_lookup_t addr_lookup_handler;
+ void *addr_lookup_context;
//
// Agent section
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/2dde7030/src/router_core/terminus.c
----------------------------------------------------------------------
diff --git a/src/router_core/terminus.c b/src/router_core/terminus.c
index a717ae5..1486148 100644
--- a/src/router_core/terminus.c
+++ b/src/router_core/terminus.c
@@ -166,7 +166,7 @@ void qdr_terminus_set_address_iterator(qdr_terminus_t *term, qd_iterator_t *addr
qd_iterator_t *qdr_terminus_get_address(qdr_terminus_t *term)
{
- if (qdr_terminus_is_anonymous(term))
+ if (term->address == 0)
return 0;
return term->address->iterator;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org