You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2013/09/27 19:31:49 UTC
svn commit: r1526988 - in /qpid/trunk/qpid/extras/dispatch:
include/qpid/dispatch/ python/qpid/dispatch/router/ src/
Author: tross
Date: Fri Sep 27 17:31:48 2013
New Revision: 1526988
URL: http://svn.apache.org/r1526988
Log:
QPID-5045 - Added connection-resident shared state for links to associate links on a connection.
Modified:
qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/container.h
qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/server.h
qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py
qpid/trunk/qpid/extras/dispatch/src/container.c
qpid/trunk/qpid/extras/dispatch/src/router_node.c
qpid/trunk/qpid/extras/dispatch/src/router_private.h
qpid/trunk/qpid/extras/dispatch/src/server.c
qpid/trunk/qpid/extras/dispatch/src/server_private.h
Modified: qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/container.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/container.h?rev=1526988&r1=1526987&r2=1526988&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/container.h (original)
+++ qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/container.h Fri Sep 27 17:31:48 2013
@@ -152,8 +152,20 @@ dx_dist_mode_t dx_container_node_get_dis
dx_lifetime_policy_t dx_container_node_get_life_policy(const dx_node_t *node);
dx_link_t *dx_link(dx_node_t *node, dx_connection_t *conn, dx_direction_t dir, const char *name);
+
+/**
+ * Context associated with the link for storing link-specific state.
+ */
void dx_link_set_context(dx_link_t *link, void *link_context);
void *dx_link_get_context(dx_link_t *link);
+
+/**
+ * Link context associated with the link's connection for storing state shared across
+ * all links in a connection.
+ */
+void dx_link_set_conn_context(dx_link_t *link, void *link_context);
+void *dx_link_get_conn_context(dx_link_t *link);
+
pn_link_t *dx_link_pn(dx_link_t *link);
pn_terminus_t *dx_link_source(dx_link_t *link);
pn_terminus_t *dx_link_target(dx_link_t *link);
Modified: qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/server.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/server.h?rev=1526988&r1=1526987&r2=1526988&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/server.h (original)
+++ qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/server.h Fri Sep 27 17:31:48 2013
@@ -254,6 +254,24 @@ void *dx_connection_get_context(dx_conne
/**
+ * \brief Set the link context for a connection.
+ *
+ * @param conn Connection object supplied in DX_CONN_EVENT_{LISTENER,CONNETOR}_OPEN
+ * @param context Link context to be stored with the connection.
+ */
+void dx_connection_set_link_context(dx_connection_t *conn, void *context);
+
+
+/**
+ * \brief Get the link context from a connection.
+ *
+ * @param conn Connection object supplied in DX_CONN_EVENT_{LISTENER,CONNETOR}_OPEN
+ * @return The link context stored with the connection.
+ */
+void *dx_connection_get_link_context(dx_connection_t *conn);
+
+
+/**
* \brief Activate a connection for output.
*
* This function is used to request that the server activate the indicated
Modified: qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py?rev=1526988&r1=1526987&r2=1526988&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py (original)
+++ qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py Fri Sep 27 17:31:48 2013
@@ -281,4 +281,4 @@ class RouterEngine:
def node_updated(self, address, reachable, neighbor, link_bit, router_bit):
self.log(LOG_DEBUG, "Event: node_updated: address=%s, reachable=%r, neighbor=%r, link_bit=%d, router_bit=%d" % \
(address, reachable, neighbor, link_bit, router_bit))
- self.router_adapter.node_updataed(address, reachable, neighbor, link_bit, router_bit)
+ self.router_adapter.node_updated(address, reachable, neighbor, link_bit, router_bit)
Modified: qpid/trunk/qpid/extras/dispatch/src/container.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/container.c?rev=1526988&r1=1526987&r2=1526988&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/container.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/container.c Fri Sep 27 17:31:48 2013
@@ -651,6 +651,36 @@ void *dx_link_get_context(dx_link_t *lin
}
+void dx_link_set_conn_context(dx_link_t *link, void *context)
+{
+ pn_session_t *pn_sess = pn_link_session(link->pn_link);
+ if (!pn_sess)
+ return;
+ pn_connection_t *pn_conn = pn_session_connection(pn_sess);
+ if (!pn_conn)
+ return;
+ dx_connection_t *conn = (dx_connection_t*) pn_connection_get_context(pn_conn);
+ if (!conn)
+ return;
+ dx_connection_set_link_context(conn, context);
+}
+
+
+void *dx_link_get_conn_context(dx_link_t *link)
+{
+ pn_session_t *pn_sess = pn_link_session(link->pn_link);
+ if (!pn_sess)
+ return 0;
+ pn_connection_t *pn_conn = pn_session_connection(pn_sess);
+ if (!pn_conn)
+ return 0;
+ dx_connection_t *conn = (dx_connection_t*) pn_connection_get_context(pn_conn);
+ if (!conn)
+ return 0;
+ return dx_connection_get_link_context(conn);
+}
+
+
pn_link_t *dx_link_pn(dx_link_t *link)
{
return link->pn_link;
Modified: qpid/trunk/qpid/extras/dispatch/src/router_node.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/router_node.c?rev=1526988&r1=1526987&r2=1526988&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/router_node.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/router_node.c Fri Sep 27 17:31:48 2013
@@ -55,6 +55,7 @@ ALLOC_DEFINE(dx_router_node_t);
ALLOC_DEFINE(dx_router_ref_t);
ALLOC_DEFINE(dx_router_link_ref_t);
ALLOC_DEFINE(dx_address_t);
+ALLOC_DEFINE(dx_router_conn_t);
static void dx_router_add_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link)
@@ -95,6 +96,8 @@ static int dx_router_terminus_is_router(
{
pn_data_t *cap = pn_terminus_capabilities(term);
+ pn_data_rewind(cap);
+ pn_data_next(cap);
if (cap && pn_data_type(cap) == PN_SYMBOL) {
pn_bytes_t sym = pn_data_get_symbol(cap);
if (sym.size == strlen(DX_CAPABILITY_ROUTER) &&
@@ -121,9 +124,24 @@ static void dx_router_generate_temp_addr
}
-static int dx_router_find_mask_bit(dx_link_t *link)
+static int dx_router_find_mask_bit_LH(dx_router_t *router, dx_link_t *link)
{
- return 0; // TODO
+ dx_router_conn_t *shared = (dx_router_conn_t*) dx_link_get_conn_context(link);
+ if (shared)
+ return shared->mask_bit;
+
+ int mask_bit;
+ if (dx_bitmask_first_set(router->neighbor_free_mask, &mask_bit)) {
+ dx_bitmask_clear_bit(router->neighbor_free_mask, mask_bit);
+ } else {
+ dx_log(module, LOG_CRITICAL, "Exceeded maximum inter-router link count");
+ return -1;
+ }
+
+ shared = new_dx_router_conn_t();
+ shared->mask_bit = mask_bit;
+ dx_link_set_conn_context(link, shared);
+ return mask_bit;
}
@@ -542,7 +560,6 @@ static int router_incoming_link_handler(
int is_router = dx_router_terminus_is_router(dx_link_remote_source(link));
DEQ_ITEM_INIT(rlink);
- rlink->mask_bit = is_router ? dx_router_find_mask_bit(link) : 0;
rlink->link_type = is_router ? DX_LINK_ROUTER : DX_LINK_ENDPOINT;
rlink->link_direction = DX_INCOMING;
rlink->owning_addr = 0;
@@ -556,6 +573,7 @@ static int router_incoming_link_handler(
dx_link_set_context(link, rlink);
sys_mutex_lock(router->lock);
+ rlink->mask_bit = is_router ? dx_router_find_mask_bit_LH(router, link) : 0;
DEQ_INSERT_TAIL(router->links, rlink);
sys_mutex_unlock(router->lock);
@@ -599,7 +617,6 @@ static int router_outgoing_link_handler(
//
dx_router_link_t *rlink = new_dx_router_link_t();
DEQ_ITEM_INIT(rlink);
- rlink->mask_bit = is_router ? dx_router_find_mask_bit(link) : 0;
rlink->link_type = is_router ? DX_LINK_ROUTER : DX_LINK_ENDPOINT;
rlink->link_direction = DX_OUTGOING;
rlink->owning_addr = 0;
@@ -615,6 +632,7 @@ static int router_outgoing_link_handler(
pn_terminus_copy(dx_link_target(link), dx_link_remote_target(link));
sys_mutex_lock(router->lock);
+ rlink->mask_bit = is_router ? dx_router_find_mask_bit_LH(router, link) : 0;
if (is_router) {
//
@@ -675,6 +693,12 @@ static int router_link_detach_handler(vo
{
dx_router_t *router = (dx_router_t*) context;
dx_router_link_t *rlink = (dx_router_link_t*) dx_link_get_context(link);
+ dx_router_conn_t *shared = (dx_router_conn_t*) dx_link_get_conn_context(link);
+
+ if (shared) {
+ dx_link_set_conn_context(link, 0);
+ free_dx_router_conn_t(shared);
+ }
if (!rlink)
return 0;
@@ -766,7 +790,7 @@ static void router_outbound_open_handler
sender = dx_link(router->node, conn, DX_OUTGOING, DX_INTERNODE_LINK_NAME_2);
// TODO - We don't want to have to cast away the constness of the literal string here!
// See PROTON-429
- pn_data_put_symbol(pn_terminus_capabilities(dx_link_target(receiver)), pn_bytes(clen, (char *) DX_CAPABILITY_ROUTER));
+ pn_data_put_symbol(pn_terminus_capabilities(dx_link_source(sender)), pn_bytes(clen, (char *) DX_CAPABILITY_ROUTER));
rlink = new_dx_router_link_t();
DEQ_ITEM_INIT(rlink);
Modified: qpid/trunk/qpid/extras/dispatch/src/router_private.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/router_private.h?rev=1526988&r1=1526987&r2=1526988&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/router_private.h (original)
+++ qpid/trunk/qpid/extras/dispatch/src/router_private.h Fri Sep 27 17:31:48 2013
@@ -23,6 +23,7 @@ typedef struct dx_router_link_t dx_r
typedef struct dx_router_node_t dx_router_node_t;
typedef struct dx_router_ref_t dx_router_ref_t;
typedef struct dx_router_link_ref_t dx_router_link_ref_t;
+typedef struct dx_router_conn_t dx_router_conn_t;
typedef enum {
@@ -91,6 +92,13 @@ ALLOC_DECLARE(dx_router_link_ref_t);
DEQ_DECLARE(dx_router_link_ref_t, dx_router_link_ref_list_t);
+struct dx_router_conn_t {
+ int mask_bit;
+};
+
+ALLOC_DECLARE(dx_router_conn_t);
+
+
struct dx_address_t {
DEQ_LINKS(dx_address_t);
dx_router_message_cb handler; // In-Process Consumer
Modified: qpid/trunk/qpid/extras/dispatch/src/server.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/server.c?rev=1526988&r1=1526987&r2=1526988&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/server.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/server.c Fri Sep 27 17:31:48 2013
@@ -109,6 +109,8 @@ static void thread_process_listeners(dx_
ctx->listener = (dx_listener_t*) pn_listener_context(listener);
ctx->connector = 0;
ctx->context = ctx->listener->context;
+ ctx->user_context = 0;
+ ctx->link_context = 0;
ctx->ufd = 0;
pn_connection_t *conn = pn_connection();
@@ -630,6 +632,7 @@ static void cxtr_try_open(void *context)
ctx->connector = ct;
ctx->context = ct->context;
ctx->user_context = 0;
+ ctx->link_context = 0;
ctx->ufd = 0;
//
@@ -878,6 +881,18 @@ void *dx_connection_get_context(dx_conne
}
+void dx_connection_set_link_context(dx_connection_t *conn, void *context)
+{
+ conn->link_context = context;
+}
+
+
+void *dx_connection_get_link_context(dx_connection_t *conn)
+{
+ return conn->link_context;
+}
+
+
pn_connection_t *dx_connection_pn(dx_connection_t *conn)
{
return conn->pn_conn;
@@ -976,6 +991,7 @@ dx_user_fd_t *dx_user_fd(dx_dispatch_t *
ctx->connector = 0;
ctx->context = 0;
ctx->user_context = 0;
+ ctx->link_context = 0;
ctx->ufd = ufd;
ufd->context = context;
Modified: qpid/trunk/qpid/extras/dispatch/src/server_private.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/server_private.h?rev=1526988&r1=1526987&r2=1526988&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/server_private.h (original)
+++ qpid/trunk/qpid/extras/dispatch/src/server_private.h Fri Sep 27 17:31:48 2013
@@ -78,6 +78,7 @@ struct dx_connection_t {
dx_connector_t *connector;
void *context; // Copy of context from listener or connector
void *user_context;
+ void *link_context; // Context shared by this connection's links
dx_user_fd_t *ufd;
};
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org