You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2015/01/31 19:34:12 UTC
svn commit: r1656233 - in /qpid/dispatch/trunk:
include/qpid/dispatch/server.h src/lrp.c src/router_node.c src/server.c
src/server_private.h tests/config-2-broker/A.conf
Author: tross
Date: Sat Jan 31 18:34:12 2015
New Revision: 1656233
URL: http://svn.apache.org/r1656233
Log:
DISPATCH-6 - Completed link-attach propagation for senders
- Added deferred-call-on-connection in the server module
- Terminus lookup is a simple single prefix-by-dot algorithm for now
Modified:
qpid/dispatch/trunk/include/qpid/dispatch/server.h
qpid/dispatch/trunk/src/lrp.c
qpid/dispatch/trunk/src/router_node.c
qpid/dispatch/trunk/src/server.c
qpid/dispatch/trunk/src/server_private.h
qpid/dispatch/trunk/tests/config-2-broker/A.conf
Modified: qpid/dispatch/trunk/include/qpid/dispatch/server.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/server.h?rev=1656233&r1=1656232&r2=1656233&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/server.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/server.h Sat Jan 31 18:34:12 2015
@@ -49,6 +49,19 @@ typedef void (*qd_thread_start_cb_t)(voi
/**
+ * Deferred callback
+ *
+ * This type is for calls that are deferred until they can be invoked on
+ * a specific connection's thread.
+ *
+ * @param context An opaque context to be passed back with the call.
+ * @param discard If true, the call should be discarded because the connection it
+ * was pending on was deleted.
+ */
+typedef void (*qd_deferred_t)(void *context, bool discard);
+
+
+/**
* Set the optional thread-start handler.
*
* This handler is called once on each worker thread at the time the thread is
@@ -455,6 +468,17 @@ const qd_server_config_t *qd_connection_
/**
+ * Schedule a call to be invoked on a thread that has ownership of this connection.
+ * It will be safe for the callback to perform operations related to this connection.
+ *
+ * @param conn Connection object
+ * @param call The function to be invoked on the connection's thread
+ * @param context The context to be passed back in the callback
+ */
+void qd_connection_invoke_deferred(qd_connection_t *conn, qd_deferred_t call, void *context);
+
+
+/**
* Create a listener for incoming connections.
*
* @param qd The dispatch handle returned by qd_dispatch.
Modified: qpid/dispatch/trunk/src/lrp.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/lrp.c?rev=1656233&r1=1656232&r2=1656233&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/lrp.c (original)
+++ qpid/dispatch/trunk/src/lrp.c Sat Jan 31 18:34:12 2015
@@ -33,6 +33,8 @@ static void qd_lrpc_open_handler(void *c
qd_lrp_t *lrp = DEQ_HEAD(lrpc->lrps);
qd_router_t *router = lrpc->qd->router;
+ lrpc->conn = conn;
+
while (lrp) {
qd_address_t *addr;
qd_field_iterator_t *iter;
@@ -90,6 +92,8 @@ static void qd_lrpc_close_handler(void *
qd_lrp_t *lrp = DEQ_HEAD(lrpc->lrps);
qd_router_t *router = lrpc->qd->router;
+ lrpc->conn = 0;
+
while (lrp) {
qd_address_t *addr;
qd_field_iterator_t *iter;
Modified: qpid/dispatch/trunk/src/router_node.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_node.c?rev=1656233&r1=1656232&r2=1656233&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_node.c (original)
+++ qpid/dispatch/trunk/src/router_node.c Sat Jan 31 18:34:12 2015
@@ -319,6 +319,36 @@ static int qd_router_find_mask_bit_LH(qd
/**
+ *
+ */
+static qd_address_t *router_lookup_terminus_LH(qd_router_t *router, const char *taddr)
+{
+ //
+ // For now: Find the first instance of a '.' in the address and search for the text
+ // up to and including this instance.
+ //
+ if (taddr == 0 || *taddr == '\0')
+ return 0;
+
+ const char *cursor = taddr;
+ while (*cursor && *cursor != '.')
+ cursor++;
+ if (*cursor == '.')
+ cursor++;
+ int len = (int) (cursor - taddr);
+
+ qd_field_iterator_t *iter = qd_field_iterator_binary(taddr, len, ITER_VIEW_ADDRESS_HASH);
+ qd_field_iterator_override_prefix(iter, 'C');
+
+ qd_address_t *addr;
+ qd_hash_retrieve(router->addr_hash, iter, (void*) &addr);
+ qd_field_iterator_free(iter);
+
+ return addr;
+}
+
+
+/**
* Outgoing Link Writable Handler
*/
static int router_writable_link_handler(void* context, qd_link_t *link)
@@ -404,7 +434,7 @@ static int router_writable_link_handler(
if (qd_delivery_settled(re->delivery)) {
qd_link_t *peer_link = qd_delivery_link(re->delivery);
qd_router_link_t *peer_rlink = (qd_router_link_t*) qd_link_get_context(peer_link);
- qd_routed_event_t *return_re = new_qd_routed_event_t();
+ qd_routed_event_t *return_re = new_qd_routed_event_t();
DEQ_ITEM_INIT(return_re);
return_re->delivery = re->delivery;
return_re->message = 0;
@@ -953,7 +983,7 @@ static void router_rx_handler(void* cont
/**
* Delivery Disposition Handler
*/
-static void router_disp_handler(void* context, qd_link_t *link, qd_delivery_t *delivery)
+static void router_disposition_handler(void* context, qd_link_t *link, qd_delivery_t *delivery)
{
qd_router_t *router = (qd_router_t*) context;
bool changed = qd_delivery_disp_changed(delivery);
@@ -992,6 +1022,56 @@ static void router_disp_handler(void* co
}
+typedef struct link_attach_t {
+ qd_router_t *router;
+ qd_router_link_t *peer_link;
+ qd_link_t *peer_qd_link;
+ char *link_name;
+ qd_direction_t dir;
+ qd_connection_t *conn;
+} link_attach_t;
+
+ALLOC_DECLARE(link_attach_t);
+ALLOC_DEFINE(link_attach_t);
+
+
+static void qd_router_attach_routed_link(void *context, bool discard)
+{
+ link_attach_t *la = (link_attach_t*) context;
+
+ if (!discard) {
+ qd_link_t *link = qd_link(la->router->node, la->conn, la->dir, la->link_name);
+ qd_router_link_t *rlink = new_qd_router_link_t();
+ DEQ_ITEM_INIT(rlink);
+ rlink->link_type = QD_LINK_ENDPOINT;
+ rlink->link_direction = la->dir;
+ rlink->owning_addr = 0;
+ rlink->waypoint = 0;
+ rlink->link = link;
+ rlink->ref = 0;
+ rlink->target = 0;
+ DEQ_INIT(rlink->event_fifo);
+ DEQ_INIT(rlink->msg_fifo);
+ qd_link_set_context(link, rlink);
+
+ sys_mutex_lock(la->router->lock);
+ rlink->connected_link = la->peer_link;
+ la->peer_link->connected_link = rlink;
+ DEQ_INSERT_TAIL(la->router->links, rlink);
+ sys_mutex_unlock(la->router->lock);
+
+ pn_terminus_copy(qd_link_source(link), qd_link_remote_source(la->peer_qd_link));
+ pn_terminus_copy(qd_link_target(link), qd_link_remote_target(la->peer_qd_link));
+
+ pn_link_open(qd_link_pn(link));
+ }
+
+ if (la->link_name)
+ free(la->link_name);
+ free_link_attach_t(la);
+}
+
+
/**
* New Incoming Link Handler
*/
@@ -1003,7 +1083,8 @@ static int router_incoming_link_handler(
const char *r_tgt = pn_terminus_get_address(qd_link_remote_target(link));
if (is_router && !qd_router_connection_is_inter_router(qd_link_connection(link))) {
- qd_log(router->log_source, QD_LOG_WARNING, "Incoming link claims router capability but is not on an inter-router connection");
+ qd_log(router->log_source, QD_LOG_WARNING,
+ "Incoming link claims router capability but is not on an inter-router connection");
pn_link_close(pn_link);
return 0;
}
@@ -1032,6 +1113,56 @@ static int router_incoming_link_handler(
rlink->mask_bit = is_router ? qd_router_find_mask_bit_LH(router, link) : 0;
qd_entity_cache_add(QD_ROUTER_LINK_TYPE, rlink);
DEQ_INSERT_TAIL(router->links, rlink);
+
+ //
+ // Lookup the target address to see if we can link-route this attach.
+ //
+ qd_address_t *addr = router_lookup_terminus_LH(router, r_tgt);
+ if (addr && !is_router) {
+ //
+ // This is a link-attach routable target. Propagate the attach downrange.
+ // Check first for a locally connected container.
+ //
+ qd_router_lrp_ref_t *lrpref = DEQ_HEAD(addr->lrps);
+ if (lrpref) {
+ qd_connection_t *conn = lrpref->lrp->container->conn;
+ if (conn) {
+ link_attach_t *la = new_link_attach_t();
+ la->router = router;
+ la->peer_link = rlink;
+ la->peer_qd_link = link;
+ la->link_name = strdup(pn_link_name(pn_link));
+ la->dir = QD_OUTGOING;
+ la->conn = conn;
+ qd_connection_invoke_deferred(conn, qd_router_attach_routed_link, la);
+ }
+ } else if (DEQ_SIZE(addr->rnodes) > 0) {
+ //
+ // There are no locally connected containers for this link but there is at
+ // least one on a remote router. Forward the attach toward the remote destination.
+ //
+ qd_router_node_t *remote_router = DEQ_HEAD(addr->rnodes)->router;
+ qd_router_link_t *out_link;
+ if (remote_router)
+ out_link = remote_router->peer_link;
+ if (!out_link && remote_router && remote_router->next_hop)
+ out_link = remote_router->next_hop->peer_link;
+ if (out_link) {
+ qd_connection_t *out_conn = qd_link_connection(out_link->link);
+ if (out_conn) {
+ link_attach_t *la = new_link_attach_t();
+ la->router = router;
+ la->peer_link = rlink;
+ la->peer_qd_link = link;
+ la->link_name = strdup(pn_link_name(pn_link));
+ la->dir = QD_OUTGOING;
+ la->conn = out_conn;
+ qd_connection_invoke_deferred(out_conn, qd_router_attach_routed_link, la);
+ }
+ }
+ }
+ }
+
sys_mutex_unlock(router->lock);
pn_terminus_copy(qd_link_source(link), qd_link_remote_source(link));
@@ -1039,11 +1170,6 @@ static int router_incoming_link_handler(
pn_link_flow(pn_link, 1000);
pn_link_open(pn_link);
- //
- // TODO - If the address has link-route semantics, create all associated
- // links needed to go with this one.
- //
-
return 0;
}
@@ -1065,7 +1191,8 @@ static int router_outgoing_link_handler(
qd_address_t *addr = 0;
if (is_router && !qd_router_connection_is_inter_router(qd_link_connection(link))) {
- qd_log(router->log_source, QD_LOG_WARNING, "Outgoing link claims router capability but is not on an inter-router connection");
+ qd_log(router->log_source, QD_LOG_WARNING,
+ "Outgoing link claims router capability but is not on an inter-router connection");
pn_link_close(pn_link);
return 0;
}
@@ -1092,7 +1219,8 @@ static int router_outgoing_link_handler(
if (prefix != 'M') {
qd_field_iterator_free(iter);
pn_link_close(pn_link);
- qd_log(router->log_source, QD_LOG_WARNING, "Rejected an outgoing endpoint link with a router address: %s", r_src);
+ qd_log(router->log_source, QD_LOG_WARNING,
+ "Rejected an outgoing endpoint link with a router address: %s", r_src);
return 0;
}
}
@@ -1244,7 +1372,8 @@ static int router_link_detach_handler(vo
if (router->out_links_by_mask_bit[rlink->mask_bit] == rlink)
router->out_links_by_mask_bit[rlink->mask_bit] = 0;
else
- qd_log(router->log_source, QD_LOG_CRITICAL, "Outgoing router link closing but not in index: bit=%d", rlink->mask_bit);
+ qd_log(router->log_source, QD_LOG_CRITICAL,
+ "Outgoing router link closing but not in index: bit=%d", rlink->mask_bit);
}
//
@@ -1322,7 +1451,8 @@ static void router_outbound_open_handler
receiver = qd_link(router->node, conn, QD_INCOMING, QD_INTERNODE_LINK_NAME_1);
// TODO - We don't want to have to cast away the constness of the literal string here!
// See PROTON-429
- pn_data_put_symbol(pn_terminus_capabilities(qd_link_target(receiver)), pn_bytes(clen, (char*) QD_CAPABILITY_ROUTER));
+ pn_data_put_symbol(pn_terminus_capabilities(qd_link_target(receiver)),
+ pn_bytes(clen, (char*) QD_CAPABILITY_ROUTER));
rlink = new_qd_router_link_t();
DEQ_ITEM_INIT(rlink);
@@ -1348,7 +1478,8 @@ static void router_outbound_open_handler
sender = qd_link(router->node, conn, QD_OUTGOING, QD_INTERNODE_LINK_NAME_2);
// TODO - We don't want to have to cast away the constness of the literal string here!
// See PROTON-429
- pn_data_put_symbol(pn_terminus_capabilities(qd_link_source(sender)), pn_bytes(clen, (char *) QD_CAPABILITY_ROUTER));
+ pn_data_put_symbol(pn_terminus_capabilities(qd_link_source(sender)),
+ pn_bytes(clen, (char *) QD_CAPABILITY_ROUTER));
rlink = new_qd_router_link_t();
DEQ_ITEM_INIT(rlink);
@@ -1400,7 +1531,7 @@ static void qd_router_timer_handler(void
static qd_node_type_t router_node = {"router", 0, 0,
router_rx_handler,
- router_disp_handler,
+ router_disposition_handler,
router_incoming_link_handler,
router_outgoing_link_handler,
router_writable_link_handler,
Modified: qpid/dispatch/trunk/src/server.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/server.c?rev=1656233&r1=1656232&r2=1656233&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/server.c (original)
+++ qpid/dispatch/trunk/src/server.c Sat Jan 31 18:34:12 2015
@@ -36,6 +36,7 @@ static __thread qd_server_t *thread_serv
ALLOC_DEFINE(qd_work_item_t);
ALLOC_DEFINE(qd_listener_t);
ALLOC_DEFINE(qd_connector_t);
+ALLOC_DEFINE(qd_deferred_call_t);
ALLOC_DEFINE(qd_connection_t);
ALLOC_DEFINE(qd_user_fd_t);
@@ -103,10 +104,10 @@ qd_error_t qd_entity_refresh_connection(
static void thread_process_listeners(qd_server_t *qd_server)
{
- qdpn_driver_t *driver = qd_server->driver;
- qdpn_listener_t *listener;
- qdpn_connector_t *cxtr;
- qd_connection_t *ctx;
+ qdpn_driver_t *driver = qd_server->driver;
+ qdpn_listener_t *listener;
+ qdpn_connector_t *cxtr;
+ qd_connection_t *ctx;
for (listener = qdpn_driver_listener(driver); listener; listener = qdpn_driver_listener(driver)) {
cxtr = qdpn_listener_accept(listener);
@@ -127,6 +128,8 @@ static void thread_process_listeners(qd_
ctx->user_context = 0;
ctx->link_context = 0;
ctx->ufd = 0;
+ DEQ_INIT(ctx->deferred_calls);
+ ctx->deferred_call_lock = sys_mutex();
size_t clen = strlen(QD_CAPABILITY_ANONYMOUS_RELAY);
pn_connection_t *conn = pn_connection();
@@ -229,6 +232,37 @@ static void block_if_paused_LH(qd_server
}
+static void invoke_deferred_calls(qd_connection_t *conn, bool discard)
+{
+ qd_deferred_call_list_t calls;
+ qd_deferred_call_t *dc;
+
+ //
+ // Copy the deferred calls out of the connection under lock.
+ //
+ DEQ_INIT(calls);
+ sys_mutex_lock(conn->deferred_call_lock);
+ dc = DEQ_HEAD(conn->deferred_calls);
+ while (dc) {
+ DEQ_REMOVE_HEAD(conn->deferred_calls);
+ DEQ_INSERT_TAIL(calls, dc);
+ dc = DEQ_HEAD(conn->deferred_calls);
+ }
+ sys_mutex_unlock(conn->deferred_call_lock);
+
+ //
+ // Invoke the calls outside of the critical section.
+ //
+ dc = DEQ_HEAD(calls);
+ while (dc) {
+ DEQ_REMOVE_HEAD(calls);
+ dc->call(dc->context, discard);
+ free_qd_deferred_call_t(dc);
+ dc = DEQ_HEAD(calls);
+ }
+}
+
+
static int process_connector(qd_server_t *qd_server, qdpn_connector_t *cxtr)
{
qd_connection_t *ctx = qdpn_connector_context(cxtr);
@@ -314,10 +348,12 @@ static int process_connector(qd_server_t
(qd_connection_t*) qdpn_connector_context(cxtr));
events = 0;
}
- else
+ else {
+ invoke_deferred_calls(ctx, false);
events = qd_server->conn_handler(qd_server->conn_handler_context, ctx->context,
QD_CONN_EVENT_PROCESS,
(qd_connection_t*) qdpn_connector_context(cxtr));
+ }
break;
default:
@@ -568,6 +604,8 @@ static void *thread_run(void *arg)
pn_connection_free(conn);
if (ctx->collector)
pn_collector_free(ctx->collector);
+ invoke_deferred_calls(ctx, true); // Discard any pending deferred calls
+ sys_mutex_free(ctx->deferred_call_lock);
free_qd_connection_t(ctx);
qd_server->threads_active--;
sys_mutex_unlock(qd_server->lock);
@@ -655,6 +693,8 @@ static void cxtr_try_open(void *context)
ctx->user_context = 0;
ctx->link_context = 0;
ctx->ufd = 0;
+ DEQ_INIT(ctx->deferred_calls);
+ ctx->deferred_call_lock = sys_mutex();
//
// qdpn_connector is not thread safe
@@ -995,6 +1035,21 @@ const qd_server_config_t *qd_connection_
}
+void qd_connection_invoke_deferred(qd_connection_t *conn, qd_deferred_t call, void *context)
+{
+ qd_deferred_call_t *dc = new_qd_deferred_call_t();
+ DEQ_ITEM_INIT(dc);
+ dc->call = call;
+ dc->context = context;
+
+ sys_mutex_lock(conn->deferred_call_lock);
+ DEQ_INSERT_TAIL(conn->deferred_calls, dc);
+ sys_mutex_unlock(conn->deferred_call_lock);
+
+ qd_server_activate(conn);
+}
+
+
qd_listener_t *qd_server_listen(qd_dispatch_t *qd, const qd_server_config_t *config, void *context)
{
qd_server_t *qd_server = qd->server;
@@ -1097,6 +1152,8 @@ qd_user_fd_t *qd_user_fd(qd_dispatch_t *
ctx->user_context = 0;
ctx->link_context = 0;
ctx->ufd = ufd;
+ DEQ_INIT(ctx->deferred_calls);
+ ctx->deferred_call_lock = sys_mutex();
ufd->context = context;
ufd->server = qd_server;
Modified: qpid/dispatch/trunk/src/server_private.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/server_private.h?rev=1656233&r1=1656232&r2=1656233&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/server_private.h (original)
+++ qpid/dispatch/trunk/src/server_private.h Sat Jan 31 18:34:12 2015
@@ -78,6 +78,15 @@ struct qd_connector_t {
long delay;
};
+
+typedef struct qd_deferred_call_t {
+ DEQ_LINKS(struct qd_deferred_call_t);
+ qd_deferred_t call;
+ void *context;
+} qd_deferred_call_t;
+
+DEQ_DECLARE(qd_deferred_call_t, qd_deferred_call_list_t);
+
/**
* Connection objects wrap Proton connection objects.
*/
@@ -96,6 +105,9 @@ struct qd_connection_t {
void *user_context;
void *link_context; // Context shared by this connection's links
qd_user_fd_t *ufd;
+
+ qd_deferred_call_list_t deferred_calls;
+ sys_mutex_t *deferred_call_lock;
};
DEQ_DECLARE(qd_connection_t, qd_connection_list_t);
@@ -157,6 +169,7 @@ struct qd_server_t {
ALLOC_DECLARE(qd_work_item_t);
ALLOC_DECLARE(qd_listener_t);
+ALLOC_DECLARE(qd_deferred_call_t);
ALLOC_DECLARE(qd_connector_t);
ALLOC_DECLARE(qd_connection_t);
ALLOC_DECLARE(qd_user_fd_t);
Modified: qpid/dispatch/trunk/tests/config-2-broker/A.conf
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/config-2-broker/A.conf?rev=1656233&r1=1656232&r2=1656233&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/config-2-broker/A.conf (original)
+++ qpid/dispatch/trunk/tests/config-2-broker/A.conf Sat Jan 31 18:34:12 2015
@@ -73,12 +73,12 @@ connector {
}
linkRoutePattern {
- prefix: queue1/
+ prefix: queue.
connector: broker
}
linkRoutePattern {
- prefix: queue2/
+ prefix: exchange.
connector: broker
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org