You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2018/12/06 19:34:01 UTC

qpid-dispatch git commit: DISPATCH-1194 - Added client-side module for link-route address lookup. WIP.

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master 81d2555db -> 2dde7030f


DISPATCH-1194 - Added client-side module for link-route address lookup.  WIP.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/2dde7030
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/2dde7030
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/2dde7030

Branch: refs/heads/master
Commit: 2dde7030fb032a54fdefe5b7c5c4a39a4f1bfd6e
Parents: 81d2555
Author: Ted Ross <tr...@redhat.com>
Authored: Thu Dec 6 14:33:34 2018 -0500
Committer: Ted Ross <tr...@redhat.com>
Committed: Thu Dec 6 14:33:34 2018 -0500

----------------------------------------------------------------------
 src/address_lookup_utils.c                      |   8 +-
 src/router_core/connections.c                   |   4 +-
 src/router_core/core_attach_address_lookup.h    |   4 +-
 .../address_lookup_client/lookup_client.c       | 319 ++++++++++++++++++-
 src/router_core/router_core_private.h           |   1 +
 src/router_core/terminus.c                      |   2 +-
 6 files changed, 319 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/2dde7030/src/address_lookup_utils.c
----------------------------------------------------------------------
diff --git a/src/address_lookup_utils.c b/src/address_lookup_utils.c
index 4c92921..9935cf5 100644
--- a/src/address_lookup_utils.c
+++ b/src/address_lookup_utils.c
@@ -28,10 +28,10 @@
 /* create the message application properties and body for the link route lookup
  * request message
  */
-int qcm_link_route_lookup_msg(qd_iterator_t        *address,
-                              qd_direction_t        dir,
-                              qd_composed_field_t **properties,
-                              qd_composed_field_t **body)
+int qcm_link_route_lookup_request(qd_iterator_t        *address,
+                                  qd_direction_t        dir,
+                                  qd_composed_field_t **properties,
+                                  qd_composed_field_t **body)
 {
     *properties = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, 0);
     if (!*properties)

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/2dde7030/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index f8e3d51..17feaea 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -1356,7 +1356,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
                 // This link has a target address
                 //
                 if (core->addr_lookup_handler)
-                    core->addr_lookup_handler(core, conn, link, dir, source, target);
+                    core->addr_lookup_handler(core->addr_lookup_context, conn, link, dir, source, target);
                 else {
                     qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true);
                     qdr_terminus_free(source);
@@ -1383,7 +1383,7 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act
         switch (link->link_type) {
         case QD_LINK_ENDPOINT: {
             if (core->addr_lookup_handler)
-                core->addr_lookup_handler(core, conn, link, dir, source, target);
+                core->addr_lookup_handler(core->addr_lookup_context, conn, link, dir, source, target);
             else {
                 qdr_link_outbound_detach_CT(core, link, 0, QDR_CONDITION_NO_ROUTE_TO_DESTINATION, true);
                 qdr_terminus_free(source);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/2dde7030/src/router_core/core_attach_address_lookup.h
----------------------------------------------------------------------
diff --git a/src/router_core/core_attach_address_lookup.h b/src/router_core/core_attach_address_lookup.h
index d05b9cb..1c5fc50 100644
--- a/src/router_core/core_attach_address_lookup.h
+++ b/src/router_core/core_attach_address_lookup.h
@@ -35,14 +35,14 @@
  * message or link routing.  This operation may be synchronoue (completed before it
  * returns) or asynchronous (completed later).
  *
- * @param core Pointer to the core state.
+ * @param context Module context for address-lookup module
  * @param conn Pointer to the connection over which the attach arrived.
  * @param link Pointer to the attaching link.
  * @param dir The direction of message flow for the link.
  * @param source The source terminus for the attach.
  * @param target The target terminus for the attach.
  */
-typedef void (*qdrc_attach_addr_lookup_t) (qdr_core_t       *core,
+typedef void (*qdrc_attach_addr_lookup_t) (void             *context,
                                            qdr_connection_t *conn,
                                            qdr_link_t       *link,
                                            qd_direction_t    dir,

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/2dde7030/src/router_core/modules/address_lookup_client/lookup_client.c
----------------------------------------------------------------------
diff --git a/src/router_core/modules/address_lookup_client/lookup_client.c b/src/router_core/modules/address_lookup_client/lookup_client.c
index 229b00a..2b575db 100644
--- a/src/router_core/modules/address_lookup_client/lookup_client.c
+++ b/src/router_core/modules/address_lookup_client/lookup_client.c
@@ -20,9 +20,53 @@
 #include "module.h"
 #include "core_attach_address_lookup.h"
 #include "router_core_private.h"
+#include "core_events.h"
+#include "core_client_api.h"
+#include <qpid/dispatch/ctools.h>
 #include <qpid/dispatch/discriminator.h>
+#include <qpid/dispatch/address_lookup_server.h>
 #include <stdio.h>
 
+static uint64_t on_reply(qdr_core_t    *core,
+                         qdrc_client_t *api_client,
+                         void          *user_context,
+                         void          *request_context,
+                         qd_iterator_t *app_properties,
+                         qd_iterator_t *body);
+
+static void on_request_done(qdr_core_t    *core,
+                            qdrc_client_t *api_client,
+                            void          *user_context,
+                            void          *request_context,
+                            const char    *error);
+
+
+typedef struct qcm_addr_lookup_request_t {
+    DEQ_LINKS(struct qcm_addr_lookup_request_t);
+    qdr_connection_t  *conn;
+    qdr_link_t        *link;
+    qd_direction_t     dir;
+    qdr_terminus_t    *source;
+    qdr_terminus_t    *target;
+} qcm_addr_lookup_request_t;
+
+DEQ_DECLARE(qcm_addr_lookup_request_t, qcm_addr_lookup_request_list_t);
+ALLOC_DECLARE(qcm_addr_lookup_request_t);
+ALLOC_DEFINE(qcm_addr_lookup_request_t);
+
+
+typedef struct qcm_lookup_client_t {
+    qdr_core_t                     *core;
+    qdrc_event_subscription_t      *event_sub;
+    qdr_connection_t               *edge_conn;
+    uint32_t                        request_credit;
+    bool                            client_api_active;
+    qdrc_client_t                  *client_api;
+    qcm_addr_lookup_request_list_t  pending_requests;
+    qcm_addr_lookup_request_list_t  sent_requests;
+} qcm_lookup_client_t;
+
+
 static char* disambiguated_link_name(qdr_connection_info_t *conn, char *original)
 {
     size_t olen = strlen(original);
@@ -338,24 +382,269 @@ static void qdr_link_react_to_first_attach_CT(qdr_core_t       *core,
 }
 
 
-static void qcm_addr_lookup_CT(qdr_core_t       *core,
+static void qcm_addr_lookup_local_search(qcm_lookup_client_t *client, qcm_addr_lookup_request_t *request)
+{
+    bool            link_route;
+    bool            unavailable;
+    bool            core_endpoint;
+    qdr_terminus_t *term = request->dir == QD_INCOMING ? request->target : request->source;
+    qdr_address_t  *addr = qdr_lookup_terminus_address_CT(client->core,
+                                                          request->dir,
+                                                          request->conn,
+                                                          term,
+                                                          true,
+                                                          true,
+                                                          &link_route,
+                                                          &unavailable,
+                                                          &core_endpoint);
+    qdr_link_react_to_first_attach_CT(client->core,
+                                      request->conn,
+                                      addr,
+                                      request->link,
+                                      request->dir,
+                                      request->source,
+                                      request->target,
+                                      link_route,
+                                      unavailable,
+                                      core_endpoint);
+}
+
+
+static void qcm_addr_lookup_process_pending_requests_CT(qcm_lookup_client_t *client)
+{
+    int result;
+
+    while (client->request_credit > 0 && DEQ_SIZE(client->pending_requests) > 0) {
+        qcm_addr_lookup_request_t *request = DEQ_HEAD(client->pending_requests);
+        DEQ_REMOVE_HEAD(client->pending_requests);
+
+        do {
+            qd_composed_field_t *props;
+            qd_composed_field_t *body;
+            qd_iterator_t       *iter = qdr_terminus_get_address(request->dir == QD_INCOMING ? request->target : request->source);
+
+            if (iter) {
+                result = qcm_link_route_lookup_request(iter, request->dir, &props, &body);
+                if (result == 0) {
+                    result = qdrc_client_request_CT(client->client_api, request, props, body, on_reply, 0, on_request_done);
+                    if (result == 0) {
+                        DEQ_INSERT_TAIL(client->sent_requests, request);
+                        client->request_credit--;
+                        break;
+                    }
+
+                    //
+                    // TODO - set a timer (or use a timeout in the client API)
+                    //
+
+                    qd_compose_free(props);
+                    qd_compose_free(body);
+                }
+            }
+
+            //
+            // If we get here, we failed to launch the asynchronous lookup.  Fall back to a local,
+            // synchronous lookup.
+            //
+            qcm_addr_lookup_local_search(client, request);
+            free_qcm_addr_lookup_request_t(request);
+        } while (false);
+    }
+}
+
+
+//================================================================================
+// Address Lookup Handler
+//================================================================================
+
+static void qcm_addr_lookup_CT(void             *context,
                                qdr_connection_t *conn,
                                qdr_link_t       *link,
                                qd_direction_t    dir,
                                qdr_terminus_t   *source,
                                qdr_terminus_t   *target)
+{
+    qcm_lookup_client_t *client = (qcm_lookup_client_t*) context;
+    bool                 link_route;
+    bool                 unavailable;
+    bool                 core_endpoint;
+    qdr_terminus_t      *term = dir == QD_INCOMING ? target : source;
+
+    if (client->core->router_mode == QD_ROUTER_MODE_EDGE
+        && client->client_api_active
+        && qdr_terminus_get_address(term) != 0) {
+        //
+        // We are in edge mode, there is an active edge connection, and the terminus has an address.
+        // Set up and scehdule an asynchronous lookup request.
+        //
+        qcm_addr_lookup_request_t *request = new_qcm_addr_lookup_request_t();
+        DEQ_ITEM_INIT(request);
+        request->conn   = conn;
+        request->link   = link;
+        request->dir    = dir;
+        request->source = source;
+        request->target = target;
+
+        DEQ_INSERT_TAIL(client->pending_requests, request);
+        qcm_addr_lookup_process_pending_requests_CT(client);
+    } else {
+        //
+        // If this lookup doesn't meet the criteria for asynchronous action, perform the built-in, synchronous address lookup
+        //
+        qdr_address_t *addr = qdr_lookup_terminus_address_CT(client->core, dir, conn, term, true, true, &link_route, &unavailable, &core_endpoint);
+        qdr_link_react_to_first_attach_CT(client->core, conn, addr, link, dir, source, target, link_route, unavailable, core_endpoint);
+    }
+}
 
+
+//================================================================================
+// Core Client API Handlers
+//================================================================================
+
+static void on_state(qdr_core_t    *core,
+                     qdrc_client_t *api_client,
+                     void          *user_context,
+                     bool           active)
 {
-    bool link_route;
-    bool unavailable;
-    bool core_endpoint;
-    qdr_terminus_t *term = dir == QD_INCOMING ? target : source;
+    qcm_lookup_client_t *client = (qcm_lookup_client_t*) user_context;
+
+    client->client_api_active = active;
+    if (!active) {
+        //
+        // Client-API links are down, set our available credit to zero.
+        //
+        client->request_credit = 0;
 
-    qdr_address_t *addr = qdr_lookup_terminus_address_CT(core, dir, conn, term, true, true, &link_route, &unavailable, &core_endpoint);
-    qdr_link_react_to_first_attach_CT(core, conn, addr, link, dir, source, target, link_route, unavailable, core_endpoint);
+        //
+        // Locally process all pending requests
+        //
+        qcm_addr_lookup_request_t *request = DEQ_HEAD(client->pending_requests);
+        while (request) {
+            DEQ_REMOVE_HEAD(client->pending_requests);
+            qcm_addr_lookup_local_search(client, request);
+            free_qcm_addr_lookup_request_t(request);
+            request = DEQ_HEAD(client->pending_requests);
+        }
+    }
 }
 
 
+static void on_flow(qdr_core_t    *core,
+                    qdrc_client_t *api_client,
+                    void          *user_context,
+                    int            available_credit,
+                    bool           drain)
+{
+    qcm_lookup_client_t *client = (qcm_lookup_client_t*) user_context;
+
+    client->request_credit = available_credit;
+
+    //
+    // If we have positive credit, process any pending requests
+    //
+    if (client->request_credit > 0)
+        qcm_addr_lookup_process_pending_requests_CT(client);
+
+    if (drain)
+        client->request_credit = 0;
+}
+
+
+static uint64_t on_reply(qdr_core_t    *core,
+                         qdrc_client_t *api_client,
+                         void          *user_context,
+                         void          *request_context,
+                         qd_iterator_t *app_properties,
+                         qd_iterator_t *body)
+{
+    qcm_lookup_client_t         *client  = (qcm_lookup_client_t*) user_context;
+    qcm_addr_lookup_request_t   *request = (qcm_addr_lookup_request_t*) request_context;
+    qcm_address_lookup_status_t  status;
+    bool                         is_link_route;
+    bool                         has_destinations;
+
+    status = qcm_link_route_lookup_decode(app_properties, body, &is_link_route, &has_destinations);
+    if (status == QCM_ADDR_LOOKUP_OK) {
+        //
+        // TODO - Add resolution using the received data
+        //
+        qcm_addr_lookup_local_search(client, request);
+    } else {
+        qcm_addr_lookup_local_search(client, request);
+    }
+
+    return 0;
+}
+
+
+static void on_request_done(qdr_core_t    *core,
+                            qdrc_client_t *api_client,
+                            void          *user_context,
+                            void          *request_context,
+                            const char    *error)
+{
+    qcm_lookup_client_t       *client  = (qcm_lookup_client_t*) user_context;
+    qcm_addr_lookup_request_t *request = (qcm_addr_lookup_request_t*) request_context;
+
+    if (error) {
+        qcm_addr_lookup_local_search(client, request);
+    }
+
+    DEQ_REMOVE(client->sent_requests, request);
+    free_qcm_addr_lookup_request_t(request);
+}
+
+
+//================================================================================
+// Event Handlers
+//================================================================================
+
+static void on_conn_event(void             *context,
+                          qdrc_event_t      event_type,
+                          qdr_connection_t *conn)
+{
+    qcm_lookup_client_t *client = (qcm_lookup_client_t*) context;
+
+    switch (event_type) {
+    case QDRC_EVENT_CONN_EDGE_ESTABLISHED:
+        client->edge_conn      = conn;
+        client->request_credit = 0;
+
+        //
+        // Set up a Client API session on the edge connection.
+        //
+        qdr_terminus_t *target = qdr_terminus(0);
+        qdr_terminus_set_address(target, QD_TERMINUS_ADDRESS_LOOKUP);
+        client->client_api = qdrc_client_CT(client->core,
+                                            client->edge_conn,
+                                            target,
+                                            250,
+                                            client,
+                                            on_state,
+                                            on_flow);
+        break;
+
+    case QDRC_EVENT_CONN_EDGE_LOST:
+        client->edge_conn      = 0;
+        client->request_credit = 0;
+
+        //
+        // Remove the Client API session.
+        //
+        qdrc_client_free_CT(client->client_api);
+        client->client_api = 0;
+        break;
+
+    default:
+        assert(false);
+        break;
+    }
+}
+
+//================================================================================
+// Module Handlers
+//================================================================================
+
 static bool qcm_addr_lookup_client_enable_CT(qdr_core_t *core)
 {
     return true;
@@ -365,16 +654,26 @@ static bool qcm_addr_lookup_client_enable_CT(qdr_core_t *core)
 static void qcm_addr_lookup_client_init_CT(qdr_core_t *core, void **module_context)
 {
     assert(core->addr_lookup_handler == 0);
+    qcm_lookup_client_t *client = NEW(qcm_lookup_client_t);
+    ZERO(client);
+
+    client->core      = core;
+    client->event_sub = qdrc_event_subscribe_CT(client->core,
+                                                QDRC_EVENT_CONN_EDGE_ESTABLISHED | QDRC_EVENT_CONN_EDGE_LOST,
+                                                on_conn_event, 0, 0,
+                                                client);
 
     core->addr_lookup_handler = qcm_addr_lookup_CT;
-    *module_context           = core;
+    core->addr_lookup_context = client;
+    *module_context           = client;
 }
 
 
 static void qcm_addr_lookup_client_final_CT(void *module_context)
 {
-    qdr_core_t *core = (qdr_core_t*) module_context;
-    core->addr_lookup_handler = 0;
+    qcm_lookup_client_t *client = (qcm_lookup_client_t*) module_context;
+    client->core->addr_lookup_handler = 0;
+    free(client);
 }
 
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/2dde7030/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index bb4af57..82f6c5e 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -729,6 +729,7 @@ struct qdr_core_t {
     qdr_link_list_t       open_links;
 
     qdrc_attach_addr_lookup_t  addr_lookup_handler;
+    void                      *addr_lookup_context;
 
     //
     // Agent section

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/2dde7030/src/router_core/terminus.c
----------------------------------------------------------------------
diff --git a/src/router_core/terminus.c b/src/router_core/terminus.c
index a717ae5..1486148 100644
--- a/src/router_core/terminus.c
+++ b/src/router_core/terminus.c
@@ -166,7 +166,7 @@ void qdr_terminus_set_address_iterator(qdr_terminus_t *term, qd_iterator_t *addr
 
 qd_iterator_t *qdr_terminus_get_address(qdr_terminus_t *term)
 {
-    if (qdr_terminus_is_anonymous(term))
+    if (term->address == 0)
         return 0;
 
     return term->address->iterator;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org