You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2013/07/12 23:44:15 UTC

svn commit: r1502698 - in /qpid/trunk/qpid/extras/dispatch: include/qpid/dispatch/container.h include/qpid/dispatch/router.h src/agent.c src/container.c src/parse.c src/python_embedded.c src/router_node.c src/server.c

Author: tross
Date: Fri Jul 12 21:44:14 2013
New Revision: 1502698

URL: http://svn.apache.org/r1502698
Log:
QPID-4967 - Router code advances
  o Fixed handling of SASL on outbound connections
  o Added Send and Receive message paths in and out of Python modules
  o Overhauled the route-table data structures
    - Multicasting is now supported (multiple sender links with the same address)
    - Support has been added for message-based routing semantics as well as link-based
  o Two Dispatch processes connected to each other will now discover each other as neighbors

Modified:
    qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/container.h
    qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/router.h
    qpid/trunk/qpid/extras/dispatch/src/agent.c
    qpid/trunk/qpid/extras/dispatch/src/container.c
    qpid/trunk/qpid/extras/dispatch/src/parse.c
    qpid/trunk/qpid/extras/dispatch/src/python_embedded.c
    qpid/trunk/qpid/extras/dispatch/src/router_node.c
    qpid/trunk/qpid/extras/dispatch/src/server.c

Modified: qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/container.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/container.h?rev=1502698&r1=1502697&r2=1502698&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/container.h (original)
+++ qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/container.h Fri Jul 12 21:44:14 2013
@@ -88,10 +88,10 @@ typedef struct {
 
 int dx_container_register_node_type(dx_dispatch_t *dispatch, const dx_node_type_t *nt);
 
-void dx_container_set_default_node_type(dx_dispatch_t        *dispatch,
-                                        const dx_node_type_t *nt,
-                                        void                 *node_context,
-                                        dx_dist_mode_t        supported_dist);
+dx_node_t *dx_container_set_default_node_type(dx_dispatch_t        *dispatch,
+                                              const dx_node_type_t *nt,
+                                              void                 *node_context,
+                                              dx_dist_mode_t        supported_dist);
 
 dx_node_t *dx_container_create_node(dx_dispatch_t        *dispatch,
                                     const dx_node_type_t *nt,

Modified: qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/router.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/router.h?rev=1502698&r1=1502697&r2=1502698&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/router.h (original)
+++ qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/router.h Fri Jul 12 21:44:14 2013
@@ -31,7 +31,6 @@ typedef void (*dx_router_message_cb)(voi
 
 
 dx_address_t *dx_router_register_address(dx_dispatch_t        *dx,
-                                         bool                  is_local,
                                          const char           *address,
                                          dx_router_message_cb  handler,
                                          void                 *context);

Modified: qpid/trunk/qpid/extras/dispatch/src/agent.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/agent.c?rev=1502698&r1=1502697&r2=1502698&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/agent.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/agent.c Fri Jul 12 21:44:14 2013
@@ -262,7 +262,7 @@ dx_agent_t *dx_agent(dx_dispatch_t *dx)
     DEQ_INIT(agent->out_fifo);
     agent->lock    = sys_mutex();
     agent->timer   = dx_timer(dx, dx_agent_deferred_handler, agent);
-    agent->address = dx_router_register_address(dx, true, "agent", dx_agent_rx_handler, agent);
+    agent->address = dx_router_register_address(dx, "agent", dx_agent_rx_handler, agent);
 
     return agent;
 }

Modified: qpid/trunk/qpid/extras/dispatch/src/container.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/container.c?rev=1502698&r1=1502697&r2=1502698&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/container.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/container.c Fri Jul 12 21:44:14 2013
@@ -513,10 +513,10 @@ int dx_container_register_node_type(dx_d
 }
 
 
-void dx_container_set_default_node_type(dx_dispatch_t        *dx,
-                                        const dx_node_type_t *nt,
-                                        void                 *context,
-                                        dx_dist_mode_t        supported_dist)
+dx_node_t *dx_container_set_default_node_type(dx_dispatch_t        *dx,
+                                              const dx_node_type_t *nt,
+                                              void                 *context,
+                                              dx_dist_mode_t        supported_dist)
 {
     dx_container_t *container = dx->container;
 
@@ -530,6 +530,8 @@ void dx_container_set_default_node_type(
         container->default_node = 0;
         dx_log(module, LOG_TRACE, "Default node removed");
     }
+
+    return container->default_node;
 }
 
 

Modified: qpid/trunk/qpid/extras/dispatch/src/parse.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/parse.c?rev=1502698&r1=1502697&r2=1502698&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/parse.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/parse.c Fri Jul 12 21:44:14 2013
@@ -37,13 +37,14 @@ ALLOC_DECLARE(dx_parsed_field_t);
 ALLOC_DEFINE(dx_parsed_field_t);
 
 
-static char *get_type_info(dx_field_iterator_t *iter, uint8_t *tag, uint32_t *length, uint32_t *count)
+static char *get_type_info(dx_field_iterator_t *iter, uint8_t *tag, uint32_t *length, uint32_t *count, uint32_t *clen)
 {
     if (dx_field_iterator_end(iter))
         return "Insufficient Data to Determine Tag";
-    *tag    = dx_field_iterator_octet(iter);
-    *count  = 0;
-    *length = 0;
+    *tag      = dx_field_iterator_octet(iter);
+    *count    = 0;
+    *length   = 0;
+    *clen     = 0;
 
     switch (*tag & 0xF0) {
     case 0x40: *length = 0;  break;
@@ -78,6 +79,7 @@ static char *get_type_info(dx_field_iter
         *count += ((unsigned int) dx_field_iterator_octet(iter)) << 24;
         *count += ((unsigned int) dx_field_iterator_octet(iter)) << 16;
         *count += ((unsigned int) dx_field_iterator_octet(iter)) << 8;
+        *clen = 3;
         // fall through to the next case
         
     case 0xC0:
@@ -85,6 +87,7 @@ static char *get_type_info(dx_field_iter
         if (dx_field_iterator_end(iter))
             return "Insufficient Data to Determine Count";
         *count += (unsigned int) dx_field_iterator_octet(iter);
+        *clen += 1;
         break;
     }
 
@@ -108,13 +111,13 @@ static dx_parsed_field_t *dx_parse_inter
 
     uint32_t length;
     uint32_t count;
+    uint32_t length_of_count;
 
-    field->parse_error = get_type_info(iter, &field->tag, &length, &count);
+    field->parse_error = get_type_info(iter, &field->tag, &length, &count, &length_of_count);
 
     if (!field->parse_error) {
         field->raw_iter = dx_field_iterator_sub(iter, length);
-        if (count == 0 && length > 0)
-            dx_field_iterator_advance(iter, length);
+        dx_field_iterator_advance(iter, length - length_of_count);
         for (uint32_t idx = 0; idx < count; idx++) {
             dx_parsed_field_t *child = dx_parse_internal(field->raw_iter, field);
             DEQ_INSERT_TAIL(field->children, child);

Modified: qpid/trunk/qpid/extras/dispatch/src/python_embedded.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/python_embedded.c?rev=1502698&r1=1502697&r2=1502698&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/python_embedded.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/python_embedded.c Fri Jul 12 21:44:14 2013
@@ -396,6 +396,7 @@ static PyTypeObject LogAdapterType = {
 typedef struct {
     PyObject_HEAD
     PyObject       *handler;
+    PyObject       *handler_rx_call;
     dx_dispatch_t  *dx;
     dx_address_t   *address;
 } IoAdapter;
@@ -403,9 +404,66 @@ typedef struct {
 
 static void dx_io_rx_handler(void *context, dx_message_t *msg)
 {
-    //IoAdapter *self = (IoAdapter*) context;
+    IoAdapter *self = (IoAdapter*) context;
 
-    // TODO - Parse the incoming message and send it to the python handler.
+    //
+    // Parse the message through the body and exit if the message is not well formed.
+    //
+    if (!dx_message_check(msg, DX_DEPTH_BODY))
+        return;
+
+    //
+    // Get an iterator for the application-properties.  Exit if the message has none.
+    //
+    dx_field_iterator_t *ap = dx_message_field_iterator(msg, DX_FIELD_APPLICATION_PROPERTIES);
+    if (ap == 0)
+        return;
+
+    //
+    // Try to get a map-view of the application-properties.
+    //
+    dx_parsed_field_t *ap_map = dx_parse(ap);
+    if (ap_map == 0 || !dx_parse_ok(ap_map) || !dx_parse_is_map(ap_map)) {
+        dx_field_iterator_free(ap);
+        dx_parse_free(ap_map);
+        return;
+    }
+
+    //
+    // Get an iterator for the body.  Exit if the message has none.
+    //
+    dx_field_iterator_t *body = dx_message_field_iterator(msg, DX_FIELD_BODY);
+    if (body == 0) {
+        dx_field_iterator_free(ap);
+        dx_parse_free(ap_map);
+        return;
+    }
+
+    //
+    // Try to get a map-view of the body.
+    //
+    dx_parsed_field_t *body_map = dx_parse(body);
+    if (body_map == 0 || !dx_parse_ok(body_map) || !dx_parse_is_map(body_map)) {
+        printf("XXXX %s\n", dx_parse_error(body_map));
+        dx_field_iterator_free(ap);
+        dx_field_iterator_free(body);
+        dx_parse_free(ap_map);
+        dx_parse_free(body_map);
+        return;
+    }
+
+    PyObject *pAP   = dx_field_to_py(ap_map);
+    PyObject *pBody = dx_field_to_py(body_map);
+
+    PyObject *pArgs = PyTuple_New(2);
+    PyTuple_SetItem(pArgs, 0, pAP);
+    PyTuple_SetItem(pArgs, 1, pBody);
+
+    PyObject *pValue = PyObject_CallObject(self->handler_rx_call, pArgs);
+    Py_DECREF(pArgs);
+    if (pValue) {
+        Py_DECREF(pValue);
+    }
 }
 
 
@@ -415,9 +473,14 @@ static int IoAdapter_init(IoAdapter *sel
     if (!PyArg_ParseTuple(args, "Os", &self->handler, &address))
         return -1;
 
+    self->handler_rx_call = PyObject_GetAttrString(self->handler, "receive");
+    if (!self->handler_rx_call || !PyCallable_Check(self->handler_rx_call))
+        return -1;
+
     Py_INCREF(self->handler);
+    Py_INCREF(self->handler_rx_call);
     self->dx = dispatch;
-    self->address = dx_router_register_address(self->dx, true, address, dx_io_rx_handler, self);
+    self->address = dx_router_register_address(self->dx, address, dx_io_rx_handler, self);
     return 0;
 }
 
@@ -426,6 +489,7 @@ static void IoAdapter_dealloc(IoAdapter*
 {
     dx_router_unregister_address(self->address);
     Py_DECREF(self->handler);
+    Py_DECREF(self->handler_rx_call);
     self->ob_type->tp_free((PyObject*)self);
 }
 

Modified: qpid/trunk/qpid/extras/dispatch/src/router_node.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/router_node.c?rev=1502698&r1=1502697&r2=1502698&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/router_node.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/router_node.c Fri Jul 12 21:44:14 2013
@@ -28,8 +28,9 @@ static char *module = "ROUTER";
 static void dx_router_python_setup(dx_router_t *router);
 static void dx_pyrouter_tick(dx_router_t *router);
 
-//static char *local_prefix = "_local/";
-//static char *topo_prefix  = "_topo/";
+static char *router_address = "_local/qdxrouter";
+static char *local_prefix   = "_local/";
+//static char *topo_prefix    = "_topo/";
 
 /**
  * Address Types and Processing:
@@ -46,54 +47,74 @@ static void dx_pyrouter_tick(dx_router_t
  *   <mobile>                             M<mobile>      forward+handler   forward
  */
 
-struct dx_router_t {
-    dx_dispatch_t      *dx;
-    const char         *router_area;
-    const char         *router_id;
-    dx_node_t          *node;
-    dx_link_list_t      in_links;
-    dx_link_list_t      out_links;
-    dx_message_list_t   in_fifo;
-    sys_mutex_t        *lock;
-    dx_timer_t         *timer;
-    hash_t             *out_hash;
-    uint64_t            dtag;
-    PyObject           *pyRouter;
-    PyObject           *pyTick;
-};
 
+typedef struct dx_router_link_t dx_router_link_t;
+typedef struct dx_router_node_t dx_router_node_t;
 
-typedef struct {
-    dx_link_t         *link;
-    dx_message_list_t  out_fifo;
-} dx_router_link_t;
+
+typedef enum {
+    DX_LINK_ENDPOINT,   // A link to a connected endpoint
+    DX_LINK_ROUTER,     // A link to a peer router in the same area
+    DX_LINK_AREA        // A link to a peer router in a different area (area boundary)
+} dx_link_type_t;
+
+
+struct dx_router_link_t {
+    DEQ_LINKS(dx_router_link_t);
+    dx_direction_t     link_direction;
+    dx_link_type_t     link_type;
+    dx_address_t      *owning_addr;     // [ref] Address record that owns this link
+    dx_link_t         *link;            // [own] Link pointer
+    dx_router_link_t  *connected_link;  // [ref] If this is a link-route, reference the connected link
+    dx_router_link_t  *peer_link;       // [ref] If this is a bidirectional link-route, reference the peer link
+    dx_message_list_t  out_fifo;        // Message FIFO for outgoing messages
+};
 
 ALLOC_DECLARE(dx_router_link_t);
 ALLOC_DEFINE(dx_router_link_t);
+DEQ_DECLARE(dx_router_link_t, dx_router_link_list_t);
 
-
-typedef struct {
+struct dx_router_node_t {
+    DEQ_LINKS(dx_router_node_t);
     const char       *id;
-    dx_router_link_t *next_hop;
+    dx_router_node_t *next_hop;   // Next hop node _if_ this is not a neighbor node
+    dx_router_link_t *peer_link;  // Outgoing link _if_ this is a neighbor node
     // list of valid origins (pointers to router_node) - (bit masks?)
-} dx_router_node_t;
+};
 
 ALLOC_DECLARE(dx_router_node_t);
 ALLOC_DEFINE(dx_router_node_t);
+DEQ_DECLARE(dx_router_node_t, dx_router_node_list_t);
 
 
 struct dx_address_t {
-    int                   is_local;
-    dx_router_message_cb  handler;           // In-Process Consumer
-    void                 *handler_context;
-    dx_router_link_t     *rlink;             // Locally-Connected Consumer  - TODO: Make this a list
-    dx_router_node_t     *rnode;             // Remotely-Connected Consumer - TODO: Make this a list
+    dx_router_message_cb   handler;          // In-Process Consumer
+    void                  *handler_context;
+    dx_router_link_list_t  rlinks;           // Locally-Connected Consumers
+    dx_router_node_list_t  rnodes;           // Remotely-Connected Consumers
 };
 
 ALLOC_DECLARE(dx_address_t);
 ALLOC_DEFINE(dx_address_t);
 
 
+struct dx_router_t {
+    dx_dispatch_t         *dx;
+    const char            *router_area;
+    const char            *router_id;
+    dx_node_t             *node;
+    dx_router_link_list_t  in_links;
+    dx_router_node_list_t  routers;
+    dx_message_list_t      in_fifo;
+    sys_mutex_t           *lock;
+    dx_timer_t            *timer;
+    hash_t                *out_hash;
+    uint64_t               dtag;
+    PyObject              *pyRouter;
+    PyObject              *pyTick;
+};
+
+
 /**
  * Outbound Delivery Handler
  */
@@ -141,10 +162,11 @@ static void router_tx_handler(void* cont
  */
 static void router_rx_handler(void* context, dx_link_t *link, pn_delivery_t *delivery)
 {
-    dx_router_t  *router  = (dx_router_t*) context;
-    pn_link_t    *pn_link = pn_delivery_link(delivery);
-    dx_message_t *msg;
-    int           valid_message = 0;
+    dx_router_t      *router  = (dx_router_t*) context;
+    pn_link_t        *pn_link = pn_delivery_link(delivery);
+    dx_router_link_t *rlink   = (dx_router_link_t*) dx_link_get_context(link);
+    dx_message_t     *msg;
+    int               valid_message = 0;
 
     //
     // Receive the message into a local representation.  If the returned message
@@ -158,20 +180,49 @@ static void router_rx_handler(void* cont
         return;
 
     //
-    // Validate the message through the Properties section
+    // Consume the delivery and issue a replacement credit
     //
-    valid_message = dx_message_check(msg, DX_DEPTH_PROPERTIES);
-
     pn_link_advance(pn_link);
     pn_link_flow(pn_link, 1);
 
+    sys_mutex_lock(router->lock);
+
+    //
+    // Handle the Link-Routing case.  If this incoming link is associated with a connected
+    // link, simply deliver the message to the outgoing link.  There is no need to validate
+    // the message in this case.
+    //
+    if (rlink->connected_link) {
+        dx_router_link_t *clink      = rlink->connected_link;
+        pn_link_t        *pn_outlink = dx_link_pn(clink->link);
+        DEQ_INSERT_TAIL(clink->out_fifo, msg);
+        sys_mutex_unlock(router->lock);
+
+        pn_link_offered(pn_outlink, DEQ_SIZE(clink->out_fifo));
+        dx_link_activate(clink->link);
+        sys_mutex_unlock(router->lock);
+
+        return;
+    }
+
+    //
+    // We are performing Message-Routing, therefore we will need to validate the message
+    // through the Properties section so we can access the TO field.
+    //
+    dx_message_t         *in_process_copy = 0;
+    dx_router_message_cb  handler         = 0;
+    void                 *handler_context = 0;
+
+    valid_message = dx_message_check(msg, DX_DEPTH_PROPERTIES);
+
     if (valid_message) {
         dx_field_iterator_t *iter = dx_message_field_iterator(msg, DX_FIELD_TO);
         dx_address_t        *addr;
         if (iter) {
             dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
-            sys_mutex_lock(router->lock);
             hash_retrieve(router->out_hash, iter, (void*) &addr);
+            dx_field_iterator_reset_view(iter, ITER_VIEW_NO_HOST);
+            int is_local = dx_field_iterator_prefix(iter, local_prefix);
             dx_field_iterator_free(iter);
 
             if (addr) {
@@ -179,51 +230,71 @@ static void router_rx_handler(void* cont
                 // To field is valid and contains a known destination.  Handle the various
                 // cases for forwarding.
                 //
-                // Forward to the in-process handler for this message if there is one.
-                // Note: If the handler is going to queue the message for deferred processing,
-                //       it must copy the message.  This function assumes that the handler
-                //       will process the message synchronously and be finished with it upon
-                //       completion.
-                //
-                if (addr->handler)
-                    addr->handler(addr->handler_context, msg);
-
-                //
-                // Forward to the local link for the locally-connected consumer, if present.
-                // TODO - Don't forward if this is a "_local" address.
-                //
-                if (addr->rlink) {
-                    pn_link_t    *pn_outlink = dx_link_pn(addr->rlink->link);
-                    dx_message_t *copy       = dx_message_copy(msg);
-                    DEQ_INSERT_TAIL(addr->rlink->out_fifo, copy);
-                    pn_link_offered(pn_outlink, DEQ_SIZE(addr->rlink->out_fifo));
-                    dx_link_activate(addr->rlink->link);
+
+                //
+                // Forward to the in-process handler for this message if there is one.  The
+                // actual invocation of the handler will occur later after we've released
+                // the lock.
+                //
+                if (addr->handler) {
+                    in_process_copy = dx_message_copy(msg);
+                    handler         = addr->handler;
+                    handler_context = addr->handler_context;
                 }
 
                 //
-                // Forward to the next-hop for a remotely-connected consumer, if present.
-                // Don't forward if this is a "_local" address.
+                // If the address form is local (i.e. is prefixed by _local), don't forward
+                // outside of the router process.
                 //
-                if (addr->rnode) {
-                    // TODO
+                if (!is_local) {
+                    //
+                    // Forward to all of the local links receiving this address.
+                    //
+                    dx_router_link_t *dest_link = DEQ_HEAD(addr->rlinks);
+                    while (dest_link) {
+                        pn_link_t    *pn_outlink = dx_link_pn(dest_link->link);
+                        dx_message_t *copy       = dx_message_copy(msg);
+                        DEQ_INSERT_TAIL(dest_link->out_fifo, copy);
+                        pn_link_offered(pn_outlink, DEQ_SIZE(dest_link->out_fifo));
+                        dx_link_activate(dest_link->link);
+                        dest_link = DEQ_NEXT(dest_link);
+                    }
+
+                    //
+                    // Forward to the next-hops for remote destinations.
+                    //
+                    dx_router_node_t *dest_node = DEQ_HEAD(addr->rnodes);
+                    while (dest_node) {
+                        if (dest_node->next_hop)
+                            dest_link = dest_node->next_hop->peer_link;
+                        else
+                            dest_link = dest_node->peer_link;
+                        if (dest_link) {
+                            pn_link_t    *pn_outlink = dx_link_pn(dest_link->link);
+                            dx_message_t *copy       = dx_message_copy(msg);
+                            DEQ_INSERT_TAIL(dest_link->out_fifo, copy);
+                            pn_link_offered(pn_outlink, DEQ_SIZE(dest_link->out_fifo));
+                            dx_link_activate(dest_link->link);
+                        }
+                        dest_node = DEQ_NEXT(dest_node);
+                    }
                 }
 
             } else {
                 //
                 // To field contains an unknown address.  Release the message.
                 //
+                // TODO - Undeliverable processing
                 pn_delivery_update(delivery, PN_RELEASED);
                 pn_delivery_settle(delivery);
             }
 
-            sys_mutex_unlock(router->lock); // TOINVESTIGATE Move this higher?
-            dx_free_message(msg);
-
             //
-            // If this was a pre-settled delivery, we must also locally settle it.
+            // Since we are message-routing, there is no end-to-end disposition or
+            // settlement.  Accept and settle the delivery now.
             //
-            if (pn_delivery_settled(delivery))
-                pn_delivery_settle(delivery);
+            pn_delivery_update(delivery, PN_ACCEPTED);
+            pn_delivery_settle(delivery);
         }
     } else {
         //
@@ -232,8 +303,16 @@ static void router_rx_handler(void* cont
         pn_delivery_update(delivery, PN_REJECTED);
         pn_delivery_settle(delivery);
         pn_delivery_set_context(delivery, 0);
-        dx_free_message(msg);
     }
+
+    sys_mutex_unlock(router->lock);
+    dx_free_message(msg);
+
+    //
+    // Invoke the in-process handler now that the lock is released.
+    //
+    if (handler)
+        handler(handler_context, in_process_copy);
 }
 
 
@@ -242,7 +321,14 @@ static void router_rx_handler(void* cont
  */
 static void router_disp_handler(void* context, dx_link_t *link, pn_delivery_t *delivery)
 {
-    pn_link_t *pn_link = pn_delivery_link(delivery);
+    pn_link_t        *pn_link = pn_delivery_link(delivery);
+    //dx_router_link_t *rlink   = (dx_router_link_t*) dx_link_get_context(link);
+
+    //
+    // TODO - Propagate disposition and settlement between deliveries on a link-routed
+    //        link pair.
+    //
+    return;
 
     if (pn_link_is_sender(pn_link)) {
         uint64_t       disp     = pn_delivery_remote_state(delivery);
@@ -290,25 +376,35 @@ static void router_disp_handler(void* co
  */
 static int router_incoming_link_handler(void* context, dx_link_t *link)
 {
-    dx_router_t    *router  = (dx_router_t*) context;
-    dx_link_item_t *item    = new_dx_link_item_t();
-    pn_link_t      *pn_link = dx_link_pn(link);
-
-    if (item) {
-        DEQ_ITEM_INIT(item);
-        item->link = link;
+    dx_router_t      *router  = (dx_router_t*) context;
+    dx_router_link_t *rlink   = new_dx_router_link_t();
+    pn_link_t        *pn_link = dx_link_pn(link);
 
-        sys_mutex_lock(router->lock);
-        DEQ_INSERT_TAIL(router->in_links, item);
-        sys_mutex_unlock(router->lock);
+    DEQ_ITEM_INIT(rlink);
+    rlink->link_direction = DX_INCOMING;
+    rlink->link_type      = DX_LINK_ENDPOINT;
+    rlink->owning_addr    = 0;
+    rlink->link           = link;
+    rlink->connected_link = 0;
+    rlink->peer_link      = 0;
+    DEQ_INIT(rlink->out_fifo);  // Won't be used
+
+    dx_link_set_context(link, rlink);
+
+    sys_mutex_lock(router->lock);
+    DEQ_INSERT_TAIL(router->in_links, rlink);
+    sys_mutex_unlock(router->lock);
+
+    pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link));
+    pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link));
+    pn_link_flow(pn_link, 1000);
+    pn_link_open(pn_link);
+
+    //
+    // TODO - If the address has link-route semantics, create all associated
+    //        links needed to go with this one.
+    //
 
-        pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link));
-        pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link));
-        pn_link_flow(pn_link, 1000);
-        pn_link_open(pn_link);
-    } else {
-        pn_link_close(pn_link);
-    }
     return 0;
 }
 
@@ -327,41 +423,44 @@ static int router_outgoing_link_handler(
         return 0;
     }
 
-    dx_router_link_t *rlink = new_dx_router_link_t();
-    rlink->link = link;
+    dx_field_iterator_t *iter  = dx_field_iterator_string(r_tgt, ITER_VIEW_NO_HOST);
+    dx_router_link_t    *rlink = new_dx_router_link_t();
+
+    int is_router = dx_field_iterator_equal(iter, (unsigned char*) router_address);
+
+    DEQ_ITEM_INIT(rlink);
+    rlink->link_direction = DX_OUTGOING;
+    rlink->link_type      = is_router ? DX_LINK_ROUTER : DX_LINK_ENDPOINT;
+    rlink->link           = link;
+    rlink->connected_link = 0;
+    rlink->peer_link      = 0;
     DEQ_INIT(rlink->out_fifo);
+
     dx_link_set_context(link, rlink);
 
+    dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
     dx_address_t *addr;
 
-    dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_ADDRESS_HASH);
-
     sys_mutex_lock(router->lock);
     hash_retrieve(router->out_hash, iter, (void**) &addr);
     if (!addr) {
         addr = new_dx_address_t();
-        addr->is_local        = 0;
         addr->handler         = 0;
         addr->handler_context = 0;
-        addr->rlink           = 0;
-        addr->rnode           = 0;
+        DEQ_INIT(addr->rlinks);
+        DEQ_INIT(addr->rnodes);
         hash_insert(router->out_hash, iter, addr);
     }
     dx_field_iterator_free(iter);
 
-    if (addr->rlink == 0) {
-        addr->rlink = rlink;
-        pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link));
-        pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link));
-        pn_link_open(pn_link);
-        sys_mutex_unlock(router->lock);
-        dx_log(module, LOG_TRACE, "Registered new local address: %s", r_tgt);
-        return 0;
-    }
+    rlink->owning_addr = addr;
+    DEQ_INSERT_TAIL(addr->rlinks, rlink);
 
-    dx_log(module, LOG_TRACE, "Address '%s' not registered as it already exists", r_tgt);
-    pn_link_close(pn_link);
+    pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link));
+    pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link));
+    pn_link_open(pn_link);
     sys_mutex_unlock(router->lock);
+    dx_log(module, LOG_TRACE, "Registered new local address: %s", r_tgt);
     return 0;
 }
 
@@ -403,45 +502,36 @@ static int router_writable_link_handler(
  */
 static int router_link_detach_handler(void* context, dx_link_t *link, int closed)
 {
-    dx_router_t    *router  = (dx_router_t*) context;
-    pn_link_t      *pn_link = dx_link_pn(link);
-    const char     *r_tgt   = pn_terminus_get_address(pn_link_remote_target(pn_link));
-    dx_link_item_t *item;
+    dx_router_t      *router  = (dx_router_t*) context;
+    pn_link_t        *pn_link = dx_link_pn(link);
+    dx_router_link_t *rlink   = (dx_router_link_t*) dx_link_get_context(link);
+    const char       *r_tgt   = pn_terminus_get_address(pn_link_remote_target(pn_link));
 
-    if (!r_tgt)
+    if (!rlink)
         return 0;
 
     sys_mutex_lock(router->lock);
     if (pn_link_is_sender(pn_link)) {
-        item = DEQ_HEAD(router->out_links);
+        DEQ_REMOVE(rlink->owning_addr->rlinks, rlink);
 
-        dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_ADDRESS_HASH);
-        dx_address_t        *addr;
-        if (iter) {
-            hash_retrieve(router->out_hash, iter, (void**) &addr);
-            if (addr) {
-                hash_remove(router->out_hash, iter);
-                free_dx_router_link_t(addr->rlink);
-                free_dx_address_t(addr);
-                dx_log(module, LOG_TRACE, "Removed local address: %s", r_tgt);
+        if ((rlink->owning_addr->handler == 0) &&
+            (DEQ_SIZE(rlink->owning_addr->rlinks) == 0) &&
+            (DEQ_SIZE(rlink->owning_addr->rnodes) == 0)) {
+            dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_ADDRESS_HASH);
+            dx_address_t        *addr;
+            if (iter) {
+                hash_retrieve(router->out_hash, iter, (void**) &addr);
+                if (addr == rlink->owning_addr) {
+                    hash_remove(router->out_hash, iter);
+                    free_dx_router_link_t(rlink);
+                    free_dx_address_t(addr);
+                    dx_log(module, LOG_TRACE, "Removed local address: %s", r_tgt);
+                }
+                dx_field_iterator_free(iter);
             }
-            dx_field_iterator_free(iter);
-        }
-    }
-    else
-        item = DEQ_HEAD(router->in_links);
-
-    while (item) {
-        if (item->link == link) {
-            if (pn_link_is_sender(pn_link))
-                DEQ_REMOVE(router->out_links, item);
-            else
-                DEQ_REMOVE(router->in_links, item);
-            free_dx_link_item_t(item);
-            break;
         }
-        item = item->next;
-    }
+    } else
+        DEQ_REMOVE(router->in_links, rlink);
 
     sys_mutex_unlock(router->lock);
     return 0;
@@ -455,6 +545,81 @@ static void router_inbound_open_handler(
 
 static void router_outbound_open_handler(void *type_context, dx_connection_t *conn)
 {
+    // TODO - Make sure this connection is annotated as an inter-router transport.
+    //        Ignore otherwise
+
+    dx_router_t         *router = (dx_router_t*) type_context;
+    dx_field_iterator_t *aiter  = dx_field_iterator_string(router_address, ITER_VIEW_ADDRESS_HASH);
+    dx_link_t           *sender;
+    dx_link_t           *receiver;
+    dx_router_link_t    *rlink;
+
+    //
+    // Create an incoming link and put it in the in-links collection.  The address
+    // of the remote source of this link is '_local/qdxrouter'.
+    //
+    receiver = dx_link(router->node, conn, DX_INCOMING, "inter-router-rx");
+    pn_terminus_set_address(dx_link_remote_source(receiver), router_address);
+    pn_terminus_set_address(dx_link_target(receiver), router_address);
+
+    rlink = new_dx_router_link_t();
+
+    DEQ_ITEM_INIT(rlink);
+    rlink->link_direction = DX_INCOMING;
+    rlink->link_type      = DX_LINK_ROUTER;
+    rlink->owning_addr    = 0;
+    rlink->link           = receiver;
+    rlink->connected_link = 0;
+    rlink->peer_link      = 0;
+    DEQ_INIT(rlink->out_fifo);  // Won't be used
+
+    dx_link_set_context(receiver, rlink);
+
+    sys_mutex_lock(router->lock);
+    DEQ_INSERT_TAIL(router->in_links, rlink);
+    sys_mutex_unlock(router->lock);
+
+    //
+    // Create an outgoing link with a local source of '_local/qdxrouter' and place
+    // it in the routing table.
+    //
+    sender = dx_link(router->node, conn, DX_OUTGOING, "inter-router-tx");
+    pn_terminus_set_address(dx_link_remote_target(sender), router_address);
+    pn_terminus_set_address(dx_link_source(sender), router_address);
+
+    rlink = new_dx_router_link_t();
+
+    DEQ_ITEM_INIT(rlink);
+    rlink->link_direction = DX_OUTGOING;
+    rlink->link_type      = DX_LINK_ROUTER;
+    rlink->link           = sender;
+    rlink->connected_link = 0;
+    rlink->peer_link      = 0;
+    DEQ_INIT(rlink->out_fifo);
+
+    dx_link_set_context(sender, rlink);
+
+    dx_address_t *addr;
+
+    sys_mutex_lock(router->lock);
+    hash_retrieve(router->out_hash, aiter, (void**) &addr);
+    if (!addr) {
+        addr = new_dx_address_t();
+        addr->handler         = 0;
+        addr->handler_context = 0;
+        DEQ_INIT(addr->rlinks);
+        DEQ_INIT(addr->rnodes);
+        hash_insert(router->out_hash, aiter, addr);
+    }
+
+    rlink->owning_addr = addr;
+    DEQ_INSERT_TAIL(addr->rlinks, rlink);
+    sys_mutex_unlock(router->lock);
+
+    pn_link_open(dx_link_pn(receiver));
+    pn_link_open(dx_link_pn(sender));
+    pn_link_flow(dx_link_pn(receiver), 1000);
+    dx_field_iterator_free(aiter);
 }
 
 
@@ -494,22 +659,23 @@ dx_router_t *dx_router(dx_dispatch_t *dx
     }
 
     dx_router_t *router = NEW(dx_router_t);
-    dx_container_set_default_node_type(dx, &router_node, (void*) router, DX_DIST_BOTH);
 
-    DEQ_INIT(router->in_links);
-    DEQ_INIT(router->out_links);
-    DEQ_INIT(router->in_fifo);
+    router_node.type_context = router;
 
     router->dx          = dx;
-    router->lock        = sys_mutex();
     router->router_area = area;
     router->router_id   = id;
+    router->node        = dx_container_set_default_node_type(dx, &router_node, (void*) router, DX_DIST_BOTH);
+    DEQ_INIT(router->in_links);
+    DEQ_INIT(router->routers);
+    DEQ_INIT(router->in_fifo);
+    router->lock        = sys_mutex();
+    router->timer       = dx_timer(dx, dx_router_timer_handler, (void*) router);
+    router->out_hash    = hash(10, 32, 0);
+    router->dtag        = 1;
+    router->pyRouter    = 0;
+    router->pyTick      = 0;
 
-    router->timer = dx_timer(dx, dx_router_timer_handler, (void*) router);
-
-    router->out_hash = hash(10, 32, 0);
-    router->dtag     = 1;
-    router->pyRouter = 0;
 
     //
     // Inform the field iterator module of this router's id and area.  The field iterator
@@ -547,46 +713,44 @@ void dx_router_free(dx_router_t *router)
 
 
 dx_address_t *dx_router_register_address(dx_dispatch_t        *dx,
-                                         bool                  is_local,
                                          const char           *address,
                                          dx_router_message_cb  handler,
                                          void                 *context)
 {
-    char                 addr[1000];
-    dx_address_t        *ad = new_dx_address_t();
+    char                 addr_string[1000];
+    dx_router_t         *router = dx->router;
+    dx_address_t        *addr;
     dx_field_iterator_t *iter;
-    int                  result;
 
-    if (!ad)
-        return 0;
+    strcpy(addr_string, "L");  // Local Hash-Key Space
+    strcat(addr_string, address);
+    iter = dx_field_iterator_string(addr_string, ITER_VIEW_NO_HOST);
 
-    ad->is_local        = is_local;
-    ad->handler         = handler;
-    ad->handler_context = context;
-    ad->rlink           = 0;
-
-    if (ad->is_local)
-        strcpy(addr, "L");  // Local Hash-Key Space
-    else
-        strcpy(addr, "M");  // Mobile Hash-Key Space
-
-    strcat(addr, address);
-    iter = dx_field_iterator_string(addr, ITER_VIEW_NO_HOST);
-    result = hash_insert(dx->router->out_hash, iter, ad);
-    dx_field_iterator_free(iter);
-    if (result != 0) {
-        free_dx_address_t(ad);
-        return 0;
+    sys_mutex_lock(router->lock);
+    hash_retrieve(router->out_hash, iter, (void**) &addr);
+    if (!addr) {
+        addr = new_dx_address_t();
+        addr->handler         = 0;
+        addr->handler_context = 0;
+        DEQ_INIT(addr->rlinks);
+        DEQ_INIT(addr->rnodes);
+        hash_insert(router->out_hash, iter, addr);
     }
+    dx_field_iterator_free(iter);
+
+    addr->handler         = handler;
+    addr->handler_context = context;
+
+    sys_mutex_unlock(router->lock);
 
     dx_log(module, LOG_TRACE, "In-Process Address Registered: %s", address);
-    return ad;
+    return addr;
 }
 
 
 void dx_router_unregister_address(dx_address_t *ad)
 {
-    free_dx_address_t(ad);
+    //free_dx_address_t(ad);
 }
 
 
@@ -601,12 +765,36 @@ void dx_router_send(dx_dispatch_t       
     sys_mutex_lock(router->lock);
     hash_retrieve(router->out_hash, address, (void*) &addr);
     if (addr) {
-        if (addr->rlink) {
-            pn_link_t    *pn_outlink = dx_link_pn(addr->rlink->link);
+        //
+        // Forward to all of the local links receiving this address.
+        //
+        dx_router_link_t *dest_link = DEQ_HEAD(addr->rlinks);
+        while (dest_link) {
+            pn_link_t    *pn_outlink = dx_link_pn(dest_link->link);
             dx_message_t *copy       = dx_message_copy(msg);
-            DEQ_INSERT_TAIL(addr->rlink->out_fifo, copy);
-            pn_link_offered(pn_outlink, DEQ_SIZE(addr->rlink->out_fifo));
-            dx_link_activate(addr->rlink->link);
+            DEQ_INSERT_TAIL(dest_link->out_fifo, copy);
+            pn_link_offered(pn_outlink, DEQ_SIZE(dest_link->out_fifo));
+            dx_link_activate(dest_link->link);
+            dest_link = DEQ_NEXT(dest_link);
+        }
+
+        //
+        // Forward to the next-hops for remote destinations.
+        //
+        dx_router_node_t *dest_node = DEQ_HEAD(addr->rnodes);
+        while (dest_node) {
+            if (dest_node->next_hop)
+                dest_link = dest_node->next_hop->peer_link;
+            else
+                dest_link = dest_node->peer_link;
+            if (dest_link) {
+                pn_link_t    *pn_outlink = dx_link_pn(dest_link->link);
+                dx_message_t *copy       = dx_message_copy(msg);
+                DEQ_INSERT_TAIL(dest_link->out_fifo, copy);
+                pn_link_offered(pn_outlink, DEQ_SIZE(dest_link->out_fifo));
+                dx_link_activate(dest_link->link);
+            }
+            dest_node = DEQ_NEXT(dest_node);
         }
     }
     sys_mutex_unlock(router->lock); // TOINVESTIGATE Move this higher?

Modified: qpid/trunk/qpid/extras/dispatch/src/server.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/server.c?rev=1502698&r1=1502697&r2=1502698&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/server.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/server.c Fri Jul 12 21:44:14 2013
@@ -91,26 +91,32 @@ static dx_thread_t *thread(dx_server_t *
 }
 
 
-static void thread_process_listeners(pn_driver_t *driver)
+static void thread_process_listeners(dx_server_t *dx_server)
 {
+    pn_driver_t     *driver   = dx_server->driver;
     pn_listener_t   *listener = pn_driver_listener(driver);
     pn_connector_t  *cxtr;
     dx_connection_t *ctx;
 
     while (listener) {
-        dx_log(module, LOG_TRACE, "Accepting Connection");
         cxtr = pn_listener_accept(listener);
+        dx_log(module, LOG_TRACE, "Accepting Connection from %s", pn_connector_name(cxtr));
         ctx = new_dx_connection_t();
         ctx->state        = CONN_STATE_OPENING;
         ctx->owner_thread = CONTEXT_NO_OWNER;
         ctx->enqueued     = 0;
         ctx->pn_cxtr      = cxtr;
-        ctx->pn_conn      = 0;
         ctx->listener     = (dx_listener_t*) pn_listener_context(listener);
         ctx->connector    = 0;
         ctx->context      = ctx->listener->context;
         ctx->ufd          = 0;
 
+        pn_connection_t *conn = pn_connection();
+        pn_connection_set_container(conn, dx_server->container_name);
+        pn_connector_set_connection(cxtr, conn);
+        pn_connection_set_context(conn, ctx);
+        ctx->pn_conn = conn;
+
         //
         // Get a pointer to the transport so we can insert security components into it
         //
@@ -201,20 +207,12 @@ static int process_connector(dx_server_t
         // Call the handler that is appropriate for the connector's state.
         //
         switch (ctx->state) {
-        case CONN_STATE_CONNECTING:
-            if (!pn_connector_closed(cxtr)) {
-                //ctx->state = CONN_STATE_SASL_CLIENT;
-                assert(ctx->connector);
-                ctx->connector->state = CXTR_STATE_OPEN;
-                events = 1;
-            } else {
+        case CONN_STATE_CONNECTING: {
+            if (pn_connector_closed(cxtr)) {
                 ctx->state = CONN_STATE_FAILED;
                 events = 0;
+                break;
             }
-            break;
-
-        case CONN_STATE_OPENING:
-            ctx->state = CONN_STATE_OPERATIONAL;
 
             pn_connection_t *conn = pn_connection();
             pn_connection_set_container(conn, dx_server->container_name);
@@ -222,20 +220,71 @@ static int process_connector(dx_server_t
             pn_connection_set_context(conn, ctx);
             ctx->pn_conn = conn;
 
-            dx_conn_event_t ce = DX_CONN_EVENT_PROCESS; // Initialize to keep the compiler happy
+            pn_transport_t           *tport  = pn_connector_transport(cxtr);
+            const dx_server_config_t *config = ctx->connector->config;
 
-            if (ctx->listener) {
-                ce = DX_CONN_EVENT_LISTENER_OPEN;
-            } else if (ctx->connector) {
-                ce = DX_CONN_EVENT_CONNECTOR_OPEN;
-                ctx->connector->delay = 0;
-            } else
-                assert(0);
+            //
+            // Set up SSL if appropriate
+            //
+            if (config->ssl_enabled) {
+                pn_ssl_domain_t *domain = pn_ssl_domain(PN_SSL_MODE_CLIENT);
+                pn_ssl_domain_set_credentials(domain,
+                                              config->ssl_certificate_file,
+                                              config->ssl_private_key_file,
+                                              config->ssl_password);
+
+                if (config->ssl_require_peer_authentication)
+                    pn_ssl_domain_set_peer_authentication(domain, PN_SSL_VERIFY_PEER_NAME, config->ssl_trusted_certificate_db);
+
+                pn_ssl_t *ssl = pn_ssl(tport);
+                pn_ssl_init(ssl, domain, 0);
+                pn_ssl_domain_free(domain);
+            }
 
-            dx_server->conn_handler(dx_server->conn_handler_context,
-                                    ctx->context, ce, (dx_connection_t*) pn_connector_context(cxtr));
+            //
+            // Set up SASL
+            //
+            pn_sasl_t *sasl = pn_sasl(tport);
+            pn_sasl_mechanisms(sasl, config->sasl_mechanisms);
+            pn_sasl_client(sasl);
+
+            ctx->state = CONN_STATE_OPENING;
+            assert(ctx->connector);
+            ctx->connector->state = CXTR_STATE_OPEN;
             events = 1;
             break;
+        }
+
+        case CONN_STATE_OPENING: {
+            pn_transport_t *tport = pn_connector_transport(cxtr);
+            pn_sasl_t      *sasl  = pn_sasl(tport);
+
+            if (pn_sasl_outcome(sasl) == PN_SASL_OK) {
+                ctx->state = CONN_STATE_OPERATIONAL;
+
+                dx_conn_event_t ce = DX_CONN_EVENT_PROCESS; // Initialize to keep the compiler happy
+
+                if (ctx->listener) {
+                    ce = DX_CONN_EVENT_LISTENER_OPEN;
+                } else if (ctx->connector) {
+                    ce = DX_CONN_EVENT_CONNECTOR_OPEN;
+                    ctx->connector->delay = 0;
+                } else
+                    assert(0);
+
+                dx_server->conn_handler(dx_server->conn_handler_context,
+                                        ctx->context, ce, (dx_connection_t*) pn_connector_context(cxtr));
+                events = 1;
+                break;
+            }
+            else if (pn_sasl_outcome(sasl) != PN_SASL_NONE) {
+                ctx->state = CONN_STATE_FAILED;
+                if (ctx->connector) {
+                    const dx_server_config_t *config = ctx->connector->config;
+                    dx_log(module, LOG_TRACE, "Connection to %s:%s failed", config->host, config->port);
+                }
+            }
+        }
 
         case CONN_STATE_OPERATIONAL:
             if (pn_connector_closed(cxtr)) {
@@ -411,7 +460,7 @@ static void *thread_run(void *arg)
                 //
                 // Process listeners (incoming connections).
                 //
-                thread_process_listeners(dx_server->driver);
+                thread_process_listeners(dx_server);
 
                 //
                 // Traverse the list of connectors-needing-service from the proton driver.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org