You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2013/07/12 23:44:15 UTC
svn commit: r1502698 - in /qpid/trunk/qpid/extras/dispatch:
include/qpid/dispatch/container.h include/qpid/dispatch/router.h
src/agent.c src/container.c src/parse.c src/python_embedded.c
src/router_node.c src/server.c
Author: tross
Date: Fri Jul 12 21:44:14 2013
New Revision: 1502698
URL: http://svn.apache.org/r1502698
Log:
QPID-4967 - Router code advances
o Fixed handling of SASL on outbound connections
o Added Send and Receive message paths in and out of Python modules
o Overhauled the route-table data structures
- Multicasting is now supported (multiple sender links with the same address)
- Support has been added for message-based routing semantics as well as link-based
o Two Dispatch processes connected to each other will now discover each other as neighbors
Modified:
qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/container.h
qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/router.h
qpid/trunk/qpid/extras/dispatch/src/agent.c
qpid/trunk/qpid/extras/dispatch/src/container.c
qpid/trunk/qpid/extras/dispatch/src/parse.c
qpid/trunk/qpid/extras/dispatch/src/python_embedded.c
qpid/trunk/qpid/extras/dispatch/src/router_node.c
qpid/trunk/qpid/extras/dispatch/src/server.c
Modified: qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/container.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/container.h?rev=1502698&r1=1502697&r2=1502698&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/container.h (original)
+++ qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/container.h Fri Jul 12 21:44:14 2013
@@ -88,10 +88,10 @@ typedef struct {
int dx_container_register_node_type(dx_dispatch_t *dispatch, const dx_node_type_t *nt);
-void dx_container_set_default_node_type(dx_dispatch_t *dispatch,
- const dx_node_type_t *nt,
- void *node_context,
- dx_dist_mode_t supported_dist);
+dx_node_t *dx_container_set_default_node_type(dx_dispatch_t *dispatch,
+ const dx_node_type_t *nt,
+ void *node_context,
+ dx_dist_mode_t supported_dist);
dx_node_t *dx_container_create_node(dx_dispatch_t *dispatch,
const dx_node_type_t *nt,
Modified: qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/router.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/router.h?rev=1502698&r1=1502697&r2=1502698&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/router.h (original)
+++ qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/router.h Fri Jul 12 21:44:14 2013
@@ -31,7 +31,6 @@ typedef void (*dx_router_message_cb)(voi
dx_address_t *dx_router_register_address(dx_dispatch_t *dx,
- bool is_local,
const char *address,
dx_router_message_cb handler,
void *context);
Modified: qpid/trunk/qpid/extras/dispatch/src/agent.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/agent.c?rev=1502698&r1=1502697&r2=1502698&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/agent.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/agent.c Fri Jul 12 21:44:14 2013
@@ -262,7 +262,7 @@ dx_agent_t *dx_agent(dx_dispatch_t *dx)
DEQ_INIT(agent->out_fifo);
agent->lock = sys_mutex();
agent->timer = dx_timer(dx, dx_agent_deferred_handler, agent);
- agent->address = dx_router_register_address(dx, true, "agent", dx_agent_rx_handler, agent);
+ agent->address = dx_router_register_address(dx, "agent", dx_agent_rx_handler, agent);
return agent;
}
Modified: qpid/trunk/qpid/extras/dispatch/src/container.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/container.c?rev=1502698&r1=1502697&r2=1502698&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/container.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/container.c Fri Jul 12 21:44:14 2013
@@ -513,10 +513,10 @@ int dx_container_register_node_type(dx_d
}
-void dx_container_set_default_node_type(dx_dispatch_t *dx,
- const dx_node_type_t *nt,
- void *context,
- dx_dist_mode_t supported_dist)
+dx_node_t *dx_container_set_default_node_type(dx_dispatch_t *dx,
+ const dx_node_type_t *nt,
+ void *context,
+ dx_dist_mode_t supported_dist)
{
dx_container_t *container = dx->container;
@@ -530,6 +530,8 @@ void dx_container_set_default_node_type(
container->default_node = 0;
dx_log(module, LOG_TRACE, "Default node removed");
}
+
+ return container->default_node;
}
Modified: qpid/trunk/qpid/extras/dispatch/src/parse.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/parse.c?rev=1502698&r1=1502697&r2=1502698&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/parse.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/parse.c Fri Jul 12 21:44:14 2013
@@ -37,13 +37,14 @@ ALLOC_DECLARE(dx_parsed_field_t);
ALLOC_DEFINE(dx_parsed_field_t);
-static char *get_type_info(dx_field_iterator_t *iter, uint8_t *tag, uint32_t *length, uint32_t *count)
+static char *get_type_info(dx_field_iterator_t *iter, uint8_t *tag, uint32_t *length, uint32_t *count, uint32_t *clen)
{
if (dx_field_iterator_end(iter))
return "Insufficient Data to Determine Tag";
- *tag = dx_field_iterator_octet(iter);
- *count = 0;
- *length = 0;
+ *tag = dx_field_iterator_octet(iter);
+ *count = 0;
+ *length = 0;
+ *clen = 0;
switch (*tag & 0xF0) {
case 0x40: *length = 0; break;
@@ -78,6 +79,7 @@ static char *get_type_info(dx_field_iter
*count += ((unsigned int) dx_field_iterator_octet(iter)) << 24;
*count += ((unsigned int) dx_field_iterator_octet(iter)) << 16;
*count += ((unsigned int) dx_field_iterator_octet(iter)) << 8;
+ *clen = 3;
// fall through to the next case
case 0xC0:
@@ -85,6 +87,7 @@ static char *get_type_info(dx_field_iter
if (dx_field_iterator_end(iter))
return "Insufficient Data to Determine Count";
*count += (unsigned int) dx_field_iterator_octet(iter);
+ *clen += 1;
break;
}
@@ -108,13 +111,13 @@ static dx_parsed_field_t *dx_parse_inter
uint32_t length;
uint32_t count;
+ uint32_t length_of_count;
- field->parse_error = get_type_info(iter, &field->tag, &length, &count);
+ field->parse_error = get_type_info(iter, &field->tag, &length, &count, &length_of_count);
if (!field->parse_error) {
field->raw_iter = dx_field_iterator_sub(iter, length);
- if (count == 0 && length > 0)
- dx_field_iterator_advance(iter, length);
+ dx_field_iterator_advance(iter, length - length_of_count);
for (uint32_t idx = 0; idx < count; idx++) {
dx_parsed_field_t *child = dx_parse_internal(field->raw_iter, field);
DEQ_INSERT_TAIL(field->children, child);
Modified: qpid/trunk/qpid/extras/dispatch/src/python_embedded.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/python_embedded.c?rev=1502698&r1=1502697&r2=1502698&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/python_embedded.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/python_embedded.c Fri Jul 12 21:44:14 2013
@@ -396,6 +396,7 @@ static PyTypeObject LogAdapterType = {
typedef struct {
PyObject_HEAD
PyObject *handler;
+ PyObject *handler_rx_call;
dx_dispatch_t *dx;
dx_address_t *address;
} IoAdapter;
@@ -403,9 +404,66 @@ typedef struct {
static void dx_io_rx_handler(void *context, dx_message_t *msg)
{
- //IoAdapter *self = (IoAdapter*) context;
+ IoAdapter *self = (IoAdapter*) context;
- // TODO - Parse the incoming message and send it to the python handler.
+ //
+ // Parse the message through the body and exit if the message is not well formed.
+ //
+ if (!dx_message_check(msg, DX_DEPTH_BODY))
+ return;
+
+ //
+ // Get an iterator for the application-properties. Exit if the message has none.
+ //
+ dx_field_iterator_t *ap = dx_message_field_iterator(msg, DX_FIELD_APPLICATION_PROPERTIES);
+ if (ap == 0)
+ return;
+
+ //
+ // Try to get a map-view of the application-properties.
+ //
+ dx_parsed_field_t *ap_map = dx_parse(ap);
+ if (ap_map == 0 || !dx_parse_ok(ap_map) || !dx_parse_is_map(ap_map)) {
+ dx_field_iterator_free(ap);
+ dx_parse_free(ap_map);
+ return;
+ }
+
+ //
+ // Get an iterator for the body. Exit if the message has none.
+ //
+ dx_field_iterator_t *body = dx_message_field_iterator(msg, DX_FIELD_BODY);
+ if (body == 0) {
+ dx_field_iterator_free(ap);
+ dx_parse_free(ap_map);
+ return;
+ }
+
+ //
+ // Try to get a map-view of the body.
+ //
+ dx_parsed_field_t *body_map = dx_parse(body);
+ if (body_map == 0 || !dx_parse_ok(body_map) || !dx_parse_is_map(body_map)) {
+ printf("XXXX %s\n", dx_parse_error(body_map));
+ dx_field_iterator_free(ap);
+ dx_field_iterator_free(body);
+ dx_parse_free(ap_map);
+ dx_parse_free(body_map);
+ return;
+ }
+
+ PyObject *pAP = dx_field_to_py(ap_map);
+ PyObject *pBody = dx_field_to_py(body_map);
+
+ PyObject *pArgs = PyTuple_New(2);
+ PyTuple_SetItem(pArgs, 0, pAP);
+ PyTuple_SetItem(pArgs, 1, pBody);
+
+ PyObject *pValue = PyObject_CallObject(self->handler_rx_call, pArgs);
+ Py_DECREF(pArgs);
+ if (pValue) {
+ Py_DECREF(pValue);
+ }
}
@@ -415,9 +473,14 @@ static int IoAdapter_init(IoAdapter *sel
if (!PyArg_ParseTuple(args, "Os", &self->handler, &address))
return -1;
+ self->handler_rx_call = PyObject_GetAttrString(self->handler, "receive");
+ if (!self->handler_rx_call || !PyCallable_Check(self->handler_rx_call))
+ return -1;
+
Py_INCREF(self->handler);
+ Py_INCREF(self->handler_rx_call);
self->dx = dispatch;
- self->address = dx_router_register_address(self->dx, true, address, dx_io_rx_handler, self);
+ self->address = dx_router_register_address(self->dx, address, dx_io_rx_handler, self);
return 0;
}
@@ -426,6 +489,7 @@ static void IoAdapter_dealloc(IoAdapter*
{
dx_router_unregister_address(self->address);
Py_DECREF(self->handler);
+ Py_DECREF(self->handler_rx_call);
self->ob_type->tp_free((PyObject*)self);
}
Modified: qpid/trunk/qpid/extras/dispatch/src/router_node.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/router_node.c?rev=1502698&r1=1502697&r2=1502698&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/router_node.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/router_node.c Fri Jul 12 21:44:14 2013
@@ -28,8 +28,9 @@ static char *module = "ROUTER";
static void dx_router_python_setup(dx_router_t *router);
static void dx_pyrouter_tick(dx_router_t *router);
-//static char *local_prefix = "_local/";
-//static char *topo_prefix = "_topo/";
+static char *router_address = "_local/qdxrouter";
+static char *local_prefix = "_local/";
+//static char *topo_prefix = "_topo/";
/**
* Address Types and Processing:
@@ -46,54 +47,74 @@ static void dx_pyrouter_tick(dx_router_t
* <mobile> M<mobile> forward+handler forward
*/
-struct dx_router_t {
- dx_dispatch_t *dx;
- const char *router_area;
- const char *router_id;
- dx_node_t *node;
- dx_link_list_t in_links;
- dx_link_list_t out_links;
- dx_message_list_t in_fifo;
- sys_mutex_t *lock;
- dx_timer_t *timer;
- hash_t *out_hash;
- uint64_t dtag;
- PyObject *pyRouter;
- PyObject *pyTick;
-};
+typedef struct dx_router_link_t dx_router_link_t;
+typedef struct dx_router_node_t dx_router_node_t;
-typedef struct {
- dx_link_t *link;
- dx_message_list_t out_fifo;
-} dx_router_link_t;
+
+typedef enum {
+ DX_LINK_ENDPOINT, // A link to a connected endpoint
+ DX_LINK_ROUTER, // A link to a peer router in the same area
+ DX_LINK_AREA // A link to a peer router in a different area (area boundary)
+} dx_link_type_t;
+
+
+struct dx_router_link_t {
+ DEQ_LINKS(dx_router_link_t);
+ dx_direction_t link_direction;
+ dx_link_type_t link_type;
+ dx_address_t *owning_addr; // [ref] Address record that owns this link
+ dx_link_t *link; // [own] Link pointer
+ dx_router_link_t *connected_link; // [ref] If this is a link-route, reference the connected link
+ dx_router_link_t *peer_link; // [ref] If this is a bidirectional link-route, reference the peer link
+ dx_message_list_t out_fifo; // Message FIFO for outgoing messages
+};
ALLOC_DECLARE(dx_router_link_t);
ALLOC_DEFINE(dx_router_link_t);
+DEQ_DECLARE(dx_router_link_t, dx_router_link_list_t);
-
-typedef struct {
+struct dx_router_node_t {
+ DEQ_LINKS(dx_router_node_t);
const char *id;
- dx_router_link_t *next_hop;
+ dx_router_node_t *next_hop; // Next hop node _if_ this is not a neighbor node
+ dx_router_link_t *peer_link; // Outgoing link _if_ this is a neighbor node
// list of valid origins (pointers to router_node) - (bit masks?)
-} dx_router_node_t;
+};
ALLOC_DECLARE(dx_router_node_t);
ALLOC_DEFINE(dx_router_node_t);
+DEQ_DECLARE(dx_router_node_t, dx_router_node_list_t);
struct dx_address_t {
- int is_local;
- dx_router_message_cb handler; // In-Process Consumer
- void *handler_context;
- dx_router_link_t *rlink; // Locally-Connected Consumer - TODO: Make this a list
- dx_router_node_t *rnode; // Remotely-Connected Consumer - TODO: Make this a list
+ dx_router_message_cb handler; // In-Process Consumer
+ void *handler_context;
+ dx_router_link_list_t rlinks; // Locally-Connected Consumers
+ dx_router_node_list_t rnodes; // Remotely-Connected Consumers
};
ALLOC_DECLARE(dx_address_t);
ALLOC_DEFINE(dx_address_t);
+struct dx_router_t {
+ dx_dispatch_t *dx;
+ const char *router_area;
+ const char *router_id;
+ dx_node_t *node;
+ dx_router_link_list_t in_links;
+ dx_router_node_list_t routers;
+ dx_message_list_t in_fifo;
+ sys_mutex_t *lock;
+ dx_timer_t *timer;
+ hash_t *out_hash;
+ uint64_t dtag;
+ PyObject *pyRouter;
+ PyObject *pyTick;
+};
+
+
/**
* Outbound Delivery Handler
*/
@@ -141,10 +162,11 @@ static void router_tx_handler(void* cont
*/
static void router_rx_handler(void* context, dx_link_t *link, pn_delivery_t *delivery)
{
- dx_router_t *router = (dx_router_t*) context;
- pn_link_t *pn_link = pn_delivery_link(delivery);
- dx_message_t *msg;
- int valid_message = 0;
+ dx_router_t *router = (dx_router_t*) context;
+ pn_link_t *pn_link = pn_delivery_link(delivery);
+ dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link);
+ dx_message_t *msg;
+ int valid_message = 0;
//
// Receive the message into a local representation. If the returned message
@@ -158,20 +180,49 @@ static void router_rx_handler(void* cont
return;
//
- // Validate the message through the Properties section
+ // Consume the delivery and issue a replacement credit
//
- valid_message = dx_message_check(msg, DX_DEPTH_PROPERTIES);
-
pn_link_advance(pn_link);
pn_link_flow(pn_link, 1);
+ sys_mutex_lock(router->lock);
+
+ //
+ // Handle the Link-Routing case. If this incoming link is associated with a connected
+ // link, simply deliver the message to the outgoing link. There is no need to validate
+ // the message in this case.
+ //
+ if (rlink->connected_link) {
+ dx_router_link_t *clink = rlink->connected_link;
+ pn_link_t *pn_outlink = dx_link_pn(clink->link);
+ DEQ_INSERT_TAIL(clink->out_fifo, msg);
+ sys_mutex_unlock(router->lock);
+
+ pn_link_offered(pn_outlink, DEQ_SIZE(clink->out_fifo));
+ dx_link_activate(clink->link);
+ sys_mutex_unlock(router->lock);
+
+ return;
+ }
+
+ //
+ // We are performing Message-Routing, therefore we will need to validate the message
+ // through the Properties section so we can access the TO field.
+ //
+ dx_message_t *in_process_copy = 0;
+ dx_router_message_cb handler = 0;
+ void *handler_context = 0;
+
+ valid_message = dx_message_check(msg, DX_DEPTH_PROPERTIES);
+
if (valid_message) {
dx_field_iterator_t *iter = dx_message_field_iterator(msg, DX_FIELD_TO);
dx_address_t *addr;
if (iter) {
dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
- sys_mutex_lock(router->lock);
hash_retrieve(router->out_hash, iter, (void*) &addr);
+ dx_field_iterator_reset_view(iter, ITER_VIEW_NO_HOST);
+ int is_local = dx_field_iterator_prefix(iter, local_prefix);
dx_field_iterator_free(iter);
if (addr) {
@@ -179,51 +230,71 @@ static void router_rx_handler(void* cont
// To field is valid and contains a known destination. Handle the various
// cases for forwarding.
//
- // Forward to the in-process handler for this message if there is one.
- // Note: If the handler is going to queue the message for deferred processing,
- // it must copy the message. This function assumes that the handler
- // will process the message synchronously and be finished with it upon
- // completion.
- //
- if (addr->handler)
- addr->handler(addr->handler_context, msg);
-
- //
- // Forward to the local link for the locally-connected consumer, if present.
- // TODO - Don't forward if this is a "_local" address.
- //
- if (addr->rlink) {
- pn_link_t *pn_outlink = dx_link_pn(addr->rlink->link);
- dx_message_t *copy = dx_message_copy(msg);
- DEQ_INSERT_TAIL(addr->rlink->out_fifo, copy);
- pn_link_offered(pn_outlink, DEQ_SIZE(addr->rlink->out_fifo));
- dx_link_activate(addr->rlink->link);
+
+ //
+ // Forward to the in-process handler for this message if there is one. The
+ // actual invocation of the handler will occur later after we've released
+ // the lock.
+ //
+ if (addr->handler) {
+ in_process_copy = dx_message_copy(msg);
+ handler = addr->handler;
+ handler_context = addr->handler_context;
}
//
- // Forward to the next-hop for a remotely-connected consumer, if present.
- // Don't forward if this is a "_local" address.
+ // If the address form is local (i.e. is prefixed by _local), don't forward
+ // outside of the router process.
//
- if (addr->rnode) {
- // TODO
+ if (!is_local) {
+ //
+ // Forward to all of the local links receiving this address.
+ //
+ dx_router_link_t *dest_link = DEQ_HEAD(addr->rlinks);
+ while (dest_link) {
+ pn_link_t *pn_outlink = dx_link_pn(dest_link->link);
+ dx_message_t *copy = dx_message_copy(msg);
+ DEQ_INSERT_TAIL(dest_link->out_fifo, copy);
+ pn_link_offered(pn_outlink, DEQ_SIZE(dest_link->out_fifo));
+ dx_link_activate(dest_link->link);
+ dest_link = DEQ_NEXT(dest_link);
+ }
+
+ //
+ // Forward to the next-hops for remote destinations.
+ //
+ dx_router_node_t *dest_node = DEQ_HEAD(addr->rnodes);
+ while (dest_node) {
+ if (dest_node->next_hop)
+ dest_link = dest_node->next_hop->peer_link;
+ else
+ dest_link = dest_node->peer_link;
+ if (dest_link) {
+ pn_link_t *pn_outlink = dx_link_pn(dest_link->link);
+ dx_message_t *copy = dx_message_copy(msg);
+ DEQ_INSERT_TAIL(dest_link->out_fifo, copy);
+ pn_link_offered(pn_outlink, DEQ_SIZE(dest_link->out_fifo));
+ dx_link_activate(dest_link->link);
+ }
+ dest_node = DEQ_NEXT(dest_node);
+ }
}
} else {
//
// To field contains an unknown address. Release the message.
//
+ // TODO - Undeliverable processing
pn_delivery_update(delivery, PN_RELEASED);
pn_delivery_settle(delivery);
}
- sys_mutex_unlock(router->lock); // TOINVESTIGATE Move this higher?
- dx_free_message(msg);
-
//
- // If this was a pre-settled delivery, we must also locally settle it.
+ // Since we are message-routing, there is no end-to-end disposition or
+ // settlement. Accept and settle the delivery now.
//
- if (pn_delivery_settled(delivery))
- pn_delivery_settle(delivery);
+ pn_delivery_update(delivery, PN_ACCEPTED);
+ pn_delivery_settle(delivery);
}
} else {
//
@@ -232,8 +303,16 @@ static void router_rx_handler(void* cont
pn_delivery_update(delivery, PN_REJECTED);
pn_delivery_settle(delivery);
pn_delivery_set_context(delivery, 0);
- dx_free_message(msg);
}
+
+ sys_mutex_unlock(router->lock);
+ dx_free_message(msg);
+
+ //
+ // Invoke the in-process handler now that the lock is released.
+ //
+ if (handler)
+ handler(handler_context, in_process_copy);
}
@@ -242,7 +321,14 @@ static void router_rx_handler(void* cont
*/
static void router_disp_handler(void* context, dx_link_t *link, pn_delivery_t *delivery)
{
- pn_link_t *pn_link = pn_delivery_link(delivery);
+ pn_link_t *pn_link = pn_delivery_link(delivery);
+ //dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link);
+
+ //
+ // TODO - Propagate disposition and settlement between deliveries on a link-routed
+ // link pair.
+ //
+ return;
if (pn_link_is_sender(pn_link)) {
uint64_t disp = pn_delivery_remote_state(delivery);
@@ -290,25 +376,35 @@ static void router_disp_handler(void* co
*/
static int router_incoming_link_handler(void* context, dx_link_t *link)
{
- dx_router_t *router = (dx_router_t*) context;
- dx_link_item_t *item = new_dx_link_item_t();
- pn_link_t *pn_link = dx_link_pn(link);
-
- if (item) {
- DEQ_ITEM_INIT(item);
- item->link = link;
+ dx_router_t *router = (dx_router_t*) context;
+ dx_router_link_t *rlink = new_dx_router_link_t();
+ pn_link_t *pn_link = dx_link_pn(link);
- sys_mutex_lock(router->lock);
- DEQ_INSERT_TAIL(router->in_links, item);
- sys_mutex_unlock(router->lock);
+ DEQ_ITEM_INIT(rlink);
+ rlink->link_direction = DX_INCOMING;
+ rlink->link_type = DX_LINK_ENDPOINT;
+ rlink->owning_addr = 0;
+ rlink->link = link;
+ rlink->connected_link = 0;
+ rlink->peer_link = 0;
+ DEQ_INIT(rlink->out_fifo); // Won't be used
+
+ dx_link_set_context(link, rlink);
+
+ sys_mutex_lock(router->lock);
+ DEQ_INSERT_TAIL(router->in_links, rlink);
+ sys_mutex_unlock(router->lock);
+
+ pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link));
+ pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link));
+ pn_link_flow(pn_link, 1000);
+ pn_link_open(pn_link);
+
+ //
+ // TODO - If the address has link-route semantics, create all associated
+ // links needed to go with this one.
+ //
- pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link));
- pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link));
- pn_link_flow(pn_link, 1000);
- pn_link_open(pn_link);
- } else {
- pn_link_close(pn_link);
- }
return 0;
}
@@ -327,41 +423,44 @@ static int router_outgoing_link_handler(
return 0;
}
- dx_router_link_t *rlink = new_dx_router_link_t();
- rlink->link = link;
+ dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_NO_HOST);
+ dx_router_link_t *rlink = new_dx_router_link_t();
+
+ int is_router = dx_field_iterator_equal(iter, (unsigned char*) router_address);
+
+ DEQ_ITEM_INIT(rlink);
+ rlink->link_direction = DX_OUTGOING;
+ rlink->link_type = is_router ? DX_LINK_ROUTER : DX_LINK_ENDPOINT;
+ rlink->link = link;
+ rlink->connected_link = 0;
+ rlink->peer_link = 0;
DEQ_INIT(rlink->out_fifo);
+
dx_link_set_context(link, rlink);
+ dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
dx_address_t *addr;
- dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_ADDRESS_HASH);
-
sys_mutex_lock(router->lock);
hash_retrieve(router->out_hash, iter, (void**) &addr);
if (!addr) {
addr = new_dx_address_t();
- addr->is_local = 0;
addr->handler = 0;
addr->handler_context = 0;
- addr->rlink = 0;
- addr->rnode = 0;
+ DEQ_INIT(addr->rlinks);
+ DEQ_INIT(addr->rnodes);
hash_insert(router->out_hash, iter, addr);
}
dx_field_iterator_free(iter);
- if (addr->rlink == 0) {
- addr->rlink = rlink;
- pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link));
- pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link));
- pn_link_open(pn_link);
- sys_mutex_unlock(router->lock);
- dx_log(module, LOG_TRACE, "Registered new local address: %s", r_tgt);
- return 0;
- }
+ rlink->owning_addr = addr;
+ DEQ_INSERT_TAIL(addr->rlinks, rlink);
- dx_log(module, LOG_TRACE, "Address '%s' not registered as it already exists", r_tgt);
- pn_link_close(pn_link);
+ pn_terminus_copy(pn_link_source(pn_link), pn_link_remote_source(pn_link));
+ pn_terminus_copy(pn_link_target(pn_link), pn_link_remote_target(pn_link));
+ pn_link_open(pn_link);
sys_mutex_unlock(router->lock);
+ dx_log(module, LOG_TRACE, "Registered new local address: %s", r_tgt);
return 0;
}
@@ -403,45 +502,36 @@ static int router_writable_link_handler(
*/
static int router_link_detach_handler(void* context, dx_link_t *link, int closed)
{
- dx_router_t *router = (dx_router_t*) context;
- pn_link_t *pn_link = dx_link_pn(link);
- const char *r_tgt = pn_terminus_get_address(pn_link_remote_target(pn_link));
- dx_link_item_t *item;
+ dx_router_t *router = (dx_router_t*) context;
+ pn_link_t *pn_link = dx_link_pn(link);
+ dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link);
+ const char *r_tgt = pn_terminus_get_address(pn_link_remote_target(pn_link));
- if (!r_tgt)
+ if (!rlink)
return 0;
sys_mutex_lock(router->lock);
if (pn_link_is_sender(pn_link)) {
- item = DEQ_HEAD(router->out_links);
+ DEQ_REMOVE(rlink->owning_addr->rlinks, rlink);
- dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_ADDRESS_HASH);
- dx_address_t *addr;
- if (iter) {
- hash_retrieve(router->out_hash, iter, (void**) &addr);
- if (addr) {
- hash_remove(router->out_hash, iter);
- free_dx_router_link_t(addr->rlink);
- free_dx_address_t(addr);
- dx_log(module, LOG_TRACE, "Removed local address: %s", r_tgt);
+ if ((rlink->owning_addr->handler == 0) &&
+ (DEQ_SIZE(rlink->owning_addr->rlinks) == 0) &&
+ (DEQ_SIZE(rlink->owning_addr->rnodes) == 0)) {
+ dx_field_iterator_t *iter = dx_field_iterator_string(r_tgt, ITER_VIEW_ADDRESS_HASH);
+ dx_address_t *addr;
+ if (iter) {
+ hash_retrieve(router->out_hash, iter, (void**) &addr);
+ if (addr == rlink->owning_addr) {
+ hash_remove(router->out_hash, iter);
+ free_dx_router_link_t(rlink);
+ free_dx_address_t(addr);
+ dx_log(module, LOG_TRACE, "Removed local address: %s", r_tgt);
+ }
+ dx_field_iterator_free(iter);
}
- dx_field_iterator_free(iter);
- }
- }
- else
- item = DEQ_HEAD(router->in_links);
-
- while (item) {
- if (item->link == link) {
- if (pn_link_is_sender(pn_link))
- DEQ_REMOVE(router->out_links, item);
- else
- DEQ_REMOVE(router->in_links, item);
- free_dx_link_item_t(item);
- break;
}
- item = item->next;
- }
+ } else
+ DEQ_REMOVE(router->in_links, rlink);
sys_mutex_unlock(router->lock);
return 0;
@@ -455,6 +545,81 @@ static void router_inbound_open_handler(
static void router_outbound_open_handler(void *type_context, dx_connection_t *conn)
{
+ // TODO - Make sure this connection is annotated as an inter-router transport.
+ // Ignore otherwise
+
+ dx_router_t *router = (dx_router_t*) type_context;
+ dx_field_iterator_t *aiter = dx_field_iterator_string(router_address, ITER_VIEW_ADDRESS_HASH);
+ dx_link_t *sender;
+ dx_link_t *receiver;
+ dx_router_link_t *rlink;
+
+ //
+ // Create an incoming link and put it in the in-links collection. The address
+ // of the remote source of this link is '_local/qdxrouter'.
+ //
+ receiver = dx_link(router->node, conn, DX_INCOMING, "inter-router-rx");
+ pn_terminus_set_address(dx_link_remote_source(receiver), router_address);
+ pn_terminus_set_address(dx_link_target(receiver), router_address);
+
+ rlink = new_dx_router_link_t();
+
+ DEQ_ITEM_INIT(rlink);
+ rlink->link_direction = DX_INCOMING;
+ rlink->link_type = DX_LINK_ROUTER;
+ rlink->owning_addr = 0;
+ rlink->link = receiver;
+ rlink->connected_link = 0;
+ rlink->peer_link = 0;
+ DEQ_INIT(rlink->out_fifo); // Won't be used
+
+ dx_link_set_context(receiver, rlink);
+
+ sys_mutex_lock(router->lock);
+ DEQ_INSERT_TAIL(router->in_links, rlink);
+ sys_mutex_unlock(router->lock);
+
+ //
+ // Create an outgoing link with a local source of '_local/qdxrouter' and place
+ // it in the routing table.
+ //
+ sender = dx_link(router->node, conn, DX_OUTGOING, "inter-router-tx");
+ pn_terminus_set_address(dx_link_remote_target(sender), router_address);
+ pn_terminus_set_address(dx_link_source(sender), router_address);
+
+ rlink = new_dx_router_link_t();
+
+ DEQ_ITEM_INIT(rlink);
+ rlink->link_direction = DX_OUTGOING;
+ rlink->link_type = DX_LINK_ROUTER;
+ rlink->link = sender;
+ rlink->connected_link = 0;
+ rlink->peer_link = 0;
+ DEQ_INIT(rlink->out_fifo);
+
+ dx_link_set_context(sender, rlink);
+
+ dx_address_t *addr;
+
+ sys_mutex_lock(router->lock);
+ hash_retrieve(router->out_hash, aiter, (void**) &addr);
+ if (!addr) {
+ addr = new_dx_address_t();
+ addr->handler = 0;
+ addr->handler_context = 0;
+ DEQ_INIT(addr->rlinks);
+ DEQ_INIT(addr->rnodes);
+ hash_insert(router->out_hash, aiter, addr);
+ }
+
+ rlink->owning_addr = addr;
+ DEQ_INSERT_TAIL(addr->rlinks, rlink);
+ sys_mutex_unlock(router->lock);
+
+ pn_link_open(dx_link_pn(receiver));
+ pn_link_open(dx_link_pn(sender));
+ pn_link_flow(dx_link_pn(receiver), 1000);
+ dx_field_iterator_free(aiter);
}
@@ -494,22 +659,23 @@ dx_router_t *dx_router(dx_dispatch_t *dx
}
dx_router_t *router = NEW(dx_router_t);
- dx_container_set_default_node_type(dx, &router_node, (void*) router, DX_DIST_BOTH);
- DEQ_INIT(router->in_links);
- DEQ_INIT(router->out_links);
- DEQ_INIT(router->in_fifo);
+ router_node.type_context = router;
router->dx = dx;
- router->lock = sys_mutex();
router->router_area = area;
router->router_id = id;
+ router->node = dx_container_set_default_node_type(dx, &router_node, (void*) router, DX_DIST_BOTH);
+ DEQ_INIT(router->in_links);
+ DEQ_INIT(router->routers);
+ DEQ_INIT(router->in_fifo);
+ router->lock = sys_mutex();
+ router->timer = dx_timer(dx, dx_router_timer_handler, (void*) router);
+ router->out_hash = hash(10, 32, 0);
+ router->dtag = 1;
+ router->pyRouter = 0;
+ router->pyTick = 0;
- router->timer = dx_timer(dx, dx_router_timer_handler, (void*) router);
-
- router->out_hash = hash(10, 32, 0);
- router->dtag = 1;
- router->pyRouter = 0;
//
// Inform the field iterator module of this router's id and area. The field iterator
@@ -547,46 +713,44 @@ void dx_router_free(dx_router_t *router)
dx_address_t *dx_router_register_address(dx_dispatch_t *dx,
- bool is_local,
const char *address,
dx_router_message_cb handler,
void *context)
{
- char addr[1000];
- dx_address_t *ad = new_dx_address_t();
+ char addr_string[1000];
+ dx_router_t *router = dx->router;
+ dx_address_t *addr;
dx_field_iterator_t *iter;
- int result;
- if (!ad)
- return 0;
+ strcpy(addr_string, "L"); // Local Hash-Key Space
+ strcat(addr_string, address);
+ iter = dx_field_iterator_string(addr_string, ITER_VIEW_NO_HOST);
- ad->is_local = is_local;
- ad->handler = handler;
- ad->handler_context = context;
- ad->rlink = 0;
-
- if (ad->is_local)
- strcpy(addr, "L"); // Local Hash-Key Space
- else
- strcpy(addr, "M"); // Mobile Hash-Key Space
-
- strcat(addr, address);
- iter = dx_field_iterator_string(addr, ITER_VIEW_NO_HOST);
- result = hash_insert(dx->router->out_hash, iter, ad);
- dx_field_iterator_free(iter);
- if (result != 0) {
- free_dx_address_t(ad);
- return 0;
+ sys_mutex_lock(router->lock);
+ hash_retrieve(router->out_hash, iter, (void**) &addr);
+ if (!addr) {
+ addr = new_dx_address_t();
+ addr->handler = 0;
+ addr->handler_context = 0;
+ DEQ_INIT(addr->rlinks);
+ DEQ_INIT(addr->rnodes);
+ hash_insert(router->out_hash, iter, addr);
}
+ dx_field_iterator_free(iter);
+
+ addr->handler = handler;
+ addr->handler_context = context;
+
+ sys_mutex_unlock(router->lock);
dx_log(module, LOG_TRACE, "In-Process Address Registered: %s", address);
- return ad;
+ return addr;
}
void dx_router_unregister_address(dx_address_t *ad)
{
- free_dx_address_t(ad);
+ //free_dx_address_t(ad);
}
@@ -601,12 +765,36 @@ void dx_router_send(dx_dispatch_t
sys_mutex_lock(router->lock);
hash_retrieve(router->out_hash, address, (void*) &addr);
if (addr) {
- if (addr->rlink) {
- pn_link_t *pn_outlink = dx_link_pn(addr->rlink->link);
+ //
+ // Forward to all of the local links receiving this address.
+ //
+ dx_router_link_t *dest_link = DEQ_HEAD(addr->rlinks);
+ while (dest_link) {
+ pn_link_t *pn_outlink = dx_link_pn(dest_link->link);
dx_message_t *copy = dx_message_copy(msg);
- DEQ_INSERT_TAIL(addr->rlink->out_fifo, copy);
- pn_link_offered(pn_outlink, DEQ_SIZE(addr->rlink->out_fifo));
- dx_link_activate(addr->rlink->link);
+ DEQ_INSERT_TAIL(dest_link->out_fifo, copy);
+ pn_link_offered(pn_outlink, DEQ_SIZE(dest_link->out_fifo));
+ dx_link_activate(dest_link->link);
+ dest_link = DEQ_NEXT(dest_link);
+ }
+
+ //
+ // Forward to the next-hops for remote destinations.
+ //
+ dx_router_node_t *dest_node = DEQ_HEAD(addr->rnodes);
+ while (dest_node) {
+ if (dest_node->next_hop)
+ dest_link = dest_node->next_hop->peer_link;
+ else
+ dest_link = dest_node->peer_link;
+ if (dest_link) {
+ pn_link_t *pn_outlink = dx_link_pn(dest_link->link);
+ dx_message_t *copy = dx_message_copy(msg);
+ DEQ_INSERT_TAIL(dest_link->out_fifo, copy);
+ pn_link_offered(pn_outlink, DEQ_SIZE(dest_link->out_fifo));
+ dx_link_activate(dest_link->link);
+ }
+ dest_node = DEQ_NEXT(dest_node);
}
}
sys_mutex_unlock(router->lock); // TOINVESTIGATE Move this higher?
Modified: qpid/trunk/qpid/extras/dispatch/src/server.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/server.c?rev=1502698&r1=1502697&r2=1502698&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/server.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/server.c Fri Jul 12 21:44:14 2013
@@ -91,26 +91,32 @@ static dx_thread_t *thread(dx_server_t *
}
-static void thread_process_listeners(pn_driver_t *driver)
+static void thread_process_listeners(dx_server_t *dx_server)
{
+ pn_driver_t *driver = dx_server->driver;
pn_listener_t *listener = pn_driver_listener(driver);
pn_connector_t *cxtr;
dx_connection_t *ctx;
while (listener) {
- dx_log(module, LOG_TRACE, "Accepting Connection");
cxtr = pn_listener_accept(listener);
+ dx_log(module, LOG_TRACE, "Accepting Connection from %s", pn_connector_name(cxtr));
ctx = new_dx_connection_t();
ctx->state = CONN_STATE_OPENING;
ctx->owner_thread = CONTEXT_NO_OWNER;
ctx->enqueued = 0;
ctx->pn_cxtr = cxtr;
- ctx->pn_conn = 0;
ctx->listener = (dx_listener_t*) pn_listener_context(listener);
ctx->connector = 0;
ctx->context = ctx->listener->context;
ctx->ufd = 0;
+ pn_connection_t *conn = pn_connection();
+ pn_connection_set_container(conn, dx_server->container_name);
+ pn_connector_set_connection(cxtr, conn);
+ pn_connection_set_context(conn, ctx);
+ ctx->pn_conn = conn;
+
//
// Get a pointer to the transport so we can insert security components into it
//
@@ -201,20 +207,12 @@ static int process_connector(dx_server_t
// Call the handler that is appropriate for the connector's state.
//
switch (ctx->state) {
- case CONN_STATE_CONNECTING:
- if (!pn_connector_closed(cxtr)) {
- //ctx->state = CONN_STATE_SASL_CLIENT;
- assert(ctx->connector);
- ctx->connector->state = CXTR_STATE_OPEN;
- events = 1;
- } else {
+ case CONN_STATE_CONNECTING: {
+ if (pn_connector_closed(cxtr)) {
ctx->state = CONN_STATE_FAILED;
events = 0;
+ break;
}
- break;
-
- case CONN_STATE_OPENING:
- ctx->state = CONN_STATE_OPERATIONAL;
pn_connection_t *conn = pn_connection();
pn_connection_set_container(conn, dx_server->container_name);
@@ -222,20 +220,71 @@ static int process_connector(dx_server_t
pn_connection_set_context(conn, ctx);
ctx->pn_conn = conn;
- dx_conn_event_t ce = DX_CONN_EVENT_PROCESS; // Initialize to keep the compiler happy
+ pn_transport_t *tport = pn_connector_transport(cxtr);
+ const dx_server_config_t *config = ctx->connector->config;
- if (ctx->listener) {
- ce = DX_CONN_EVENT_LISTENER_OPEN;
- } else if (ctx->connector) {
- ce = DX_CONN_EVENT_CONNECTOR_OPEN;
- ctx->connector->delay = 0;
- } else
- assert(0);
+ //
+ // Set up SSL if appropriate
+ //
+ if (config->ssl_enabled) {
+ pn_ssl_domain_t *domain = pn_ssl_domain(PN_SSL_MODE_CLIENT);
+ pn_ssl_domain_set_credentials(domain,
+ config->ssl_certificate_file,
+ config->ssl_private_key_file,
+ config->ssl_password);
+
+ if (config->ssl_require_peer_authentication)
+ pn_ssl_domain_set_peer_authentication(domain, PN_SSL_VERIFY_PEER_NAME, config->ssl_trusted_certificate_db);
+
+ pn_ssl_t *ssl = pn_ssl(tport);
+ pn_ssl_init(ssl, domain, 0);
+ pn_ssl_domain_free(domain);
+ }
- dx_server->conn_handler(dx_server->conn_handler_context,
- ctx->context, ce, (dx_connection_t*) pn_connector_context(cxtr));
+ //
+ // Set up SASL
+ //
+ pn_sasl_t *sasl = pn_sasl(tport);
+ pn_sasl_mechanisms(sasl, config->sasl_mechanisms);
+ pn_sasl_client(sasl);
+
+ ctx->state = CONN_STATE_OPENING;
+ assert(ctx->connector);
+ ctx->connector->state = CXTR_STATE_OPEN;
events = 1;
break;
+ }
+
+ case CONN_STATE_OPENING: {
+ pn_transport_t *tport = pn_connector_transport(cxtr);
+ pn_sasl_t *sasl = pn_sasl(tport);
+
+ if (pn_sasl_outcome(sasl) == PN_SASL_OK) {
+ ctx->state = CONN_STATE_OPERATIONAL;
+
+ dx_conn_event_t ce = DX_CONN_EVENT_PROCESS; // Initialize to keep the compiler happy
+
+ if (ctx->listener) {
+ ce = DX_CONN_EVENT_LISTENER_OPEN;
+ } else if (ctx->connector) {
+ ce = DX_CONN_EVENT_CONNECTOR_OPEN;
+ ctx->connector->delay = 0;
+ } else
+ assert(0);
+
+ dx_server->conn_handler(dx_server->conn_handler_context,
+ ctx->context, ce, (dx_connection_t*) pn_connector_context(cxtr));
+ events = 1;
+ break;
+ }
+ else if (pn_sasl_outcome(sasl) != PN_SASL_NONE) {
+ ctx->state = CONN_STATE_FAILED;
+ if (ctx->connector) {
+ const dx_server_config_t *config = ctx->connector->config;
+ dx_log(module, LOG_TRACE, "Connection to %s:%s failed", config->host, config->port);
+ }
+ }
+ }
case CONN_STATE_OPERATIONAL:
if (pn_connector_closed(cxtr)) {
@@ -411,7 +460,7 @@ static void *thread_run(void *arg)
//
// Process listeners (incoming connections).
//
- thread_process_listeners(dx_server->driver);
+ thread_process_listeners(dx_server);
//
// Traverse the list of connectors-needing-service from the proton driver.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org