You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2013/09/27 19:31:49 UTC

svn commit: r1526988 - in /qpid/trunk/qpid/extras/dispatch: include/qpid/dispatch/ python/qpid/dispatch/router/ src/

Author: tross
Date: Fri Sep 27 17:31:48 2013
New Revision: 1526988

URL: http://svn.apache.org/r1526988
Log:
QPID-5045 - Added connection-resident shared state for links to associate links on a connection.

Modified:
    qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/container.h
    qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/server.h
    qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py
    qpid/trunk/qpid/extras/dispatch/src/container.c
    qpid/trunk/qpid/extras/dispatch/src/router_node.c
    qpid/trunk/qpid/extras/dispatch/src/router_private.h
    qpid/trunk/qpid/extras/dispatch/src/server.c
    qpid/trunk/qpid/extras/dispatch/src/server_private.h

Modified: qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/container.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/container.h?rev=1526988&r1=1526987&r2=1526988&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/container.h (original)
+++ qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/container.h Fri Sep 27 17:31:48 2013
@@ -152,8 +152,20 @@ dx_dist_mode_t dx_container_node_get_dis
 dx_lifetime_policy_t dx_container_node_get_life_policy(const dx_node_t *node);
 
 dx_link_t *dx_link(dx_node_t *node, dx_connection_t *conn, dx_direction_t dir, const char *name);
+
+/**
+ * Context associated with the link for storing link-specific state.
+ */
 void dx_link_set_context(dx_link_t *link, void *link_context);
 void *dx_link_get_context(dx_link_t *link);
+
+/**
+ * Link context associated with the link's connection for storing state shared across
+ * all links in a connection.
+ */
+void dx_link_set_conn_context(dx_link_t *link, void *link_context);
+void *dx_link_get_conn_context(dx_link_t *link);
+
 pn_link_t *dx_link_pn(dx_link_t *link);
 pn_terminus_t *dx_link_source(dx_link_t *link);
 pn_terminus_t *dx_link_target(dx_link_t *link);

Modified: qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/server.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/server.h?rev=1526988&r1=1526987&r2=1526988&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/server.h (original)
+++ qpid/trunk/qpid/extras/dispatch/include/qpid/dispatch/server.h Fri Sep 27 17:31:48 2013
@@ -254,6 +254,24 @@ void *dx_connection_get_context(dx_conne
 
 
 /**
+ * \brief Set the link context for a connection.
+ *
+ * @param conn Connection object supplied in DX_CONN_EVENT_{LISTENER,CONNETOR}_OPEN
+ * @param context Link context to be stored with the connection.
+ */
+void dx_connection_set_link_context(dx_connection_t *conn, void *context);
+
+
+/**
+ * \brief Get the link context from a connection.
+ *
+ * @param conn Connection object supplied in DX_CONN_EVENT_{LISTENER,CONNETOR}_OPEN
+ * @return The link context stored with the connection.
+ */
+void *dx_connection_get_link_context(dx_connection_t *conn);
+
+
+/**
  * \brief Activate a connection for output.
  *
  * This function is used to request that the server activate the indicated

Modified: qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py?rev=1526988&r1=1526987&r2=1526988&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py (original)
+++ qpid/trunk/qpid/extras/dispatch/python/qpid/dispatch/router/router_engine.py Fri Sep 27 17:31:48 2013
@@ -281,4 +281,4 @@ class RouterEngine:
     def node_updated(self, address, reachable, neighbor, link_bit, router_bit):
         self.log(LOG_DEBUG, "Event: node_updated: address=%s, reachable=%r, neighbor=%r, link_bit=%d, router_bit=%d" % \
                      (address, reachable, neighbor, link_bit, router_bit))
-        self.router_adapter.node_updataed(address, reachable, neighbor, link_bit, router_bit)
+        self.router_adapter.node_updated(address, reachable, neighbor, link_bit, router_bit)

Modified: qpid/trunk/qpid/extras/dispatch/src/container.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/container.c?rev=1526988&r1=1526987&r2=1526988&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/container.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/container.c Fri Sep 27 17:31:48 2013
@@ -651,6 +651,36 @@ void *dx_link_get_context(dx_link_t *lin
 }
 
 
+void dx_link_set_conn_context(dx_link_t *link, void *context)
+{
+    pn_session_t *pn_sess = pn_link_session(link->pn_link);
+    if (!pn_sess)
+        return;
+    pn_connection_t *pn_conn = pn_session_connection(pn_sess);
+    if (!pn_conn)
+        return;
+    dx_connection_t *conn = (dx_connection_t*) pn_connection_get_context(pn_conn);
+    if (!conn)
+        return;
+    dx_connection_set_link_context(conn, context);
+}
+
+
+void *dx_link_get_conn_context(dx_link_t *link)
+{
+    pn_session_t *pn_sess = pn_link_session(link->pn_link);
+    if (!pn_sess)
+        return 0;
+    pn_connection_t *pn_conn = pn_session_connection(pn_sess);
+    if (!pn_conn)
+        return 0;
+    dx_connection_t *conn = (dx_connection_t*) pn_connection_get_context(pn_conn);
+    if (!conn)
+        return 0;
+    return dx_connection_get_link_context(conn);
+}
+
+
 pn_link_t *dx_link_pn(dx_link_t *link)
 {
     return link->pn_link;

Modified: qpid/trunk/qpid/extras/dispatch/src/router_node.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/router_node.c?rev=1526988&r1=1526987&r2=1526988&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/router_node.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/router_node.c Fri Sep 27 17:31:48 2013
@@ -55,6 +55,7 @@ ALLOC_DEFINE(dx_router_node_t);
 ALLOC_DEFINE(dx_router_ref_t);
 ALLOC_DEFINE(dx_router_link_ref_t);
 ALLOC_DEFINE(dx_address_t);
+ALLOC_DEFINE(dx_router_conn_t);
 
 
 static void dx_router_add_link_ref_LH(dx_router_link_ref_list_t *ref_list, dx_router_link_t *link)
@@ -95,6 +96,8 @@ static int dx_router_terminus_is_router(
 {
     pn_data_t *cap = pn_terminus_capabilities(term);
 
+    pn_data_rewind(cap);
+    pn_data_next(cap);
     if (cap && pn_data_type(cap) == PN_SYMBOL) {
         pn_bytes_t sym = pn_data_get_symbol(cap);
         if (sym.size == strlen(DX_CAPABILITY_ROUTER) &&
@@ -121,9 +124,24 @@ static void dx_router_generate_temp_addr
 }
 
 
-static int dx_router_find_mask_bit(dx_link_t *link)
+static int dx_router_find_mask_bit_LH(dx_router_t *router, dx_link_t *link)
 {
-    return 0;  // TODO
+    dx_router_conn_t *shared = (dx_router_conn_t*) dx_link_get_conn_context(link);
+    if (shared)
+        return shared->mask_bit;
+
+    int mask_bit;
+    if (dx_bitmask_first_set(router->neighbor_free_mask, &mask_bit)) {
+        dx_bitmask_clear_bit(router->neighbor_free_mask, mask_bit);
+    } else {
+        dx_log(module, LOG_CRITICAL, "Exceeded maximum inter-router link count");
+        return -1;
+    }
+
+    shared = new_dx_router_conn_t();
+    shared->mask_bit = mask_bit;
+    dx_link_set_conn_context(link, shared);
+    return mask_bit;
 }
 
 
@@ -542,7 +560,6 @@ static int router_incoming_link_handler(
     int is_router             = dx_router_terminus_is_router(dx_link_remote_source(link));
 
     DEQ_ITEM_INIT(rlink);
-    rlink->mask_bit       = is_router ? dx_router_find_mask_bit(link) : 0;
     rlink->link_type      = is_router ? DX_LINK_ROUTER : DX_LINK_ENDPOINT;
     rlink->link_direction = DX_INCOMING;
     rlink->owning_addr    = 0;
@@ -556,6 +573,7 @@ static int router_incoming_link_handler(
     dx_link_set_context(link, rlink);
 
     sys_mutex_lock(router->lock);
+    rlink->mask_bit = is_router ? dx_router_find_mask_bit_LH(router, link) : 0;
     DEQ_INSERT_TAIL(router->links, rlink);
     sys_mutex_unlock(router->lock);
 
@@ -599,7 +617,6 @@ static int router_outgoing_link_handler(
     //
     dx_router_link_t *rlink = new_dx_router_link_t();
     DEQ_ITEM_INIT(rlink);
-    rlink->mask_bit       = is_router ? dx_router_find_mask_bit(link) : 0;
     rlink->link_type      = is_router ? DX_LINK_ROUTER : DX_LINK_ENDPOINT;
     rlink->link_direction = DX_OUTGOING;
     rlink->owning_addr    = 0;
@@ -615,6 +632,7 @@ static int router_outgoing_link_handler(
     pn_terminus_copy(dx_link_target(link), dx_link_remote_target(link));
 
     sys_mutex_lock(router->lock);
+    rlink->mask_bit = is_router ? dx_router_find_mask_bit_LH(router, link) : 0;
 
     if (is_router) {
         //
@@ -675,6 +693,12 @@ static int router_link_detach_handler(vo
 {
     dx_router_t      *router = (dx_router_t*) context;
     dx_router_link_t *rlink  = (dx_router_link_t*) dx_link_get_context(link);
+    dx_router_conn_t *shared = (dx_router_conn_t*) dx_link_get_conn_context(link);
+
+    if (shared) {
+        dx_link_set_conn_context(link, 0);
+        free_dx_router_conn_t(shared);
+    }
 
     if (!rlink)
         return 0;
@@ -766,7 +790,7 @@ static void router_outbound_open_handler
     sender = dx_link(router->node, conn, DX_OUTGOING, DX_INTERNODE_LINK_NAME_2);
     // TODO - We don't want to have to cast away the constness of the literal string here!
     //        See PROTON-429
-    pn_data_put_symbol(pn_terminus_capabilities(dx_link_target(receiver)), pn_bytes(clen, (char *) DX_CAPABILITY_ROUTER));
+    pn_data_put_symbol(pn_terminus_capabilities(dx_link_source(sender)), pn_bytes(clen, (char *) DX_CAPABILITY_ROUTER));
 
     rlink = new_dx_router_link_t();
     DEQ_ITEM_INIT(rlink);

Modified: qpid/trunk/qpid/extras/dispatch/src/router_private.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/router_private.h?rev=1526988&r1=1526987&r2=1526988&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/router_private.h (original)
+++ qpid/trunk/qpid/extras/dispatch/src/router_private.h Fri Sep 27 17:31:48 2013
@@ -23,6 +23,7 @@ typedef struct dx_router_link_t     dx_r
 typedef struct dx_router_node_t     dx_router_node_t;
 typedef struct dx_router_ref_t      dx_router_ref_t;
 typedef struct dx_router_link_ref_t dx_router_link_ref_t;
+typedef struct dx_router_conn_t     dx_router_conn_t;
 
 
 typedef enum {
@@ -91,6 +92,13 @@ ALLOC_DECLARE(dx_router_link_ref_t);
 DEQ_DECLARE(dx_router_link_ref_t, dx_router_link_ref_list_t);
 
 
+struct dx_router_conn_t {
+    int mask_bit;
+};
+
+ALLOC_DECLARE(dx_router_conn_t);
+
+
 struct dx_address_t {
     DEQ_LINKS(dx_address_t);
     dx_router_message_cb       handler;          // In-Process Consumer

Modified: qpid/trunk/qpid/extras/dispatch/src/server.c
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/server.c?rev=1526988&r1=1526987&r2=1526988&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/server.c (original)
+++ qpid/trunk/qpid/extras/dispatch/src/server.c Fri Sep 27 17:31:48 2013
@@ -109,6 +109,8 @@ static void thread_process_listeners(dx_
         ctx->listener     = (dx_listener_t*) pn_listener_context(listener);
         ctx->connector    = 0;
         ctx->context      = ctx->listener->context;
+        ctx->user_context = 0;
+        ctx->link_context = 0;
         ctx->ufd          = 0;
 
         pn_connection_t *conn = pn_connection();
@@ -630,6 +632,7 @@ static void cxtr_try_open(void *context)
     ctx->connector    = ct;
     ctx->context      = ct->context;
     ctx->user_context = 0;
+    ctx->link_context = 0;
     ctx->ufd          = 0;
 
     //
@@ -878,6 +881,18 @@ void *dx_connection_get_context(dx_conne
 }
 
 
+void dx_connection_set_link_context(dx_connection_t *conn, void *context)
+{
+    conn->link_context = context;
+}
+
+
+void *dx_connection_get_link_context(dx_connection_t *conn)
+{
+    return conn->link_context;
+}
+
+
 pn_connection_t *dx_connection_pn(dx_connection_t *conn)
 {
     return conn->pn_conn;
@@ -976,6 +991,7 @@ dx_user_fd_t *dx_user_fd(dx_dispatch_t *
     ctx->connector    = 0;
     ctx->context      = 0;
     ctx->user_context = 0;
+    ctx->link_context = 0;
     ctx->ufd          = ufd;
 
     ufd->context = context;

Modified: qpid/trunk/qpid/extras/dispatch/src/server_private.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/dispatch/src/server_private.h?rev=1526988&r1=1526987&r2=1526988&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/dispatch/src/server_private.h (original)
+++ qpid/trunk/qpid/extras/dispatch/src/server_private.h Fri Sep 27 17:31:48 2013
@@ -78,6 +78,7 @@ struct dx_connection_t {
     dx_connector_t  *connector;
     void            *context; // Copy of context from listener or connector
     void            *user_context;
+    void            *link_context; // Context shared by this connection's links
     dx_user_fd_t    *ufd;
 };
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org