You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2015/01/31 19:34:12 UTC

svn commit: r1656233 - in /qpid/dispatch/trunk: include/qpid/dispatch/server.h src/lrp.c src/router_node.c src/server.c src/server_private.h tests/config-2-broker/A.conf

Author: tross
Date: Sat Jan 31 18:34:12 2015
New Revision: 1656233

URL: http://svn.apache.org/r1656233
Log:
DISPATCH-6 - Completed link-attach propagation for senders
  - Added deferred-call-on-connection in the server module
  - Terminus lookup is a simple single prefix-by-dot algorithm for now

Modified:
    qpid/dispatch/trunk/include/qpid/dispatch/server.h
    qpid/dispatch/trunk/src/lrp.c
    qpid/dispatch/trunk/src/router_node.c
    qpid/dispatch/trunk/src/server.c
    qpid/dispatch/trunk/src/server_private.h
    qpid/dispatch/trunk/tests/config-2-broker/A.conf

Modified: qpid/dispatch/trunk/include/qpid/dispatch/server.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/server.h?rev=1656233&r1=1656232&r2=1656233&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/server.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/server.h Sat Jan 31 18:34:12 2015
@@ -49,6 +49,19 @@ typedef void (*qd_thread_start_cb_t)(voi
 
 
 /**
+ * Deferred callback
+ *
+ * This type is for calls that are deferred until they can be invoked on
+ * a specific connection's thread.
+ *
+ * @param context An opaque context to be passed back with the call.
+ * @param discard If true, the call should be discarded because the connection it
+ *        was pending on was deleted.
+ */
+typedef void (*qd_deferred_t)(void *context, bool discard);
+
+
+/**
  * Set the optional thread-start handler.
  *
  * This handler is called once on each worker thread at the time the thread is
@@ -455,6 +468,17 @@ const qd_server_config_t *qd_connection_
 
 
 /**
+ * Schedule a call to be invoked on a thread that has ownership of this connection.
+ * It will be safe for the callback to perform operations related to this connection.
+ *
+ * @param conn Connection object
+ * @param call The function to be invoked on the connection's thread
+ * @param context The context to be passed back in the callback
+ */
+void qd_connection_invoke_deferred(qd_connection_t *conn, qd_deferred_t call, void *context);
+
+
+/**
  * Create a listener for incoming connections.
  *
  * @param qd The dispatch handle returned by qd_dispatch.

Modified: qpid/dispatch/trunk/src/lrp.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/lrp.c?rev=1656233&r1=1656232&r2=1656233&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/lrp.c (original)
+++ qpid/dispatch/trunk/src/lrp.c Sat Jan 31 18:34:12 2015
@@ -33,6 +33,8 @@ static void qd_lrpc_open_handler(void *c
     qd_lrp_t           *lrp    = DEQ_HEAD(lrpc->lrps);
     qd_router_t        *router = lrpc->qd->router;
 
+    lrpc->conn = conn;
+
     while (lrp) {
         qd_address_t        *addr;
         qd_field_iterator_t *iter;
@@ -90,6 +92,8 @@ static void qd_lrpc_close_handler(void *
     qd_lrp_t           *lrp    = DEQ_HEAD(lrpc->lrps);
     qd_router_t        *router = lrpc->qd->router;
 
+    lrpc->conn = 0;
+
     while (lrp) {
         qd_address_t        *addr;
         qd_field_iterator_t *iter;

Modified: qpid/dispatch/trunk/src/router_node.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_node.c?rev=1656233&r1=1656232&r2=1656233&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_node.c (original)
+++ qpid/dispatch/trunk/src/router_node.c Sat Jan 31 18:34:12 2015
@@ -319,6 +319,36 @@ static int qd_router_find_mask_bit_LH(qd
 
 
 /**
+ *
+ */
+static qd_address_t *router_lookup_terminus_LH(qd_router_t *router, const char *taddr)
+{
+    //
+    // For now: Find the first instance of a '.' in the address and search for the text
+    // up to and including this instance.
+    //
+    if (taddr == 0 || *taddr == '\0')
+        return 0;
+
+    const char *cursor = taddr;
+    while (*cursor && *cursor != '.')
+        cursor++;
+    if (*cursor == '.')
+        cursor++;
+    int len = (int) (cursor - taddr);
+
+    qd_field_iterator_t *iter = qd_field_iterator_binary(taddr, len, ITER_VIEW_ADDRESS_HASH);
+    qd_field_iterator_override_prefix(iter, 'C');
+
+    qd_address_t *addr;
+    qd_hash_retrieve(router->addr_hash, iter, (void*) &addr);
+    qd_field_iterator_free(iter);
+
+    return addr;
+}
+
+
+/**
  * Outgoing Link Writable Handler
  */
 static int router_writable_link_handler(void* context, qd_link_t *link)
@@ -404,7 +434,7 @@ static int router_writable_link_handler(
                 if (qd_delivery_settled(re->delivery)) {
                     qd_link_t         *peer_link  = qd_delivery_link(re->delivery);
                     qd_router_link_t  *peer_rlink = (qd_router_link_t*) qd_link_get_context(peer_link);
-                    qd_routed_event_t *return_re = new_qd_routed_event_t();
+                    qd_routed_event_t *return_re  = new_qd_routed_event_t();
                     DEQ_ITEM_INIT(return_re);
                     return_re->delivery    = re->delivery;
                     return_re->message     = 0;
@@ -953,7 +983,7 @@ static void router_rx_handler(void* cont
 /**
  * Delivery Disposition Handler
  */
-static void router_disp_handler(void* context, qd_link_t *link, qd_delivery_t *delivery)
+static void router_disposition_handler(void* context, qd_link_t *link, qd_delivery_t *delivery)
 {
     qd_router_t   *router  = (qd_router_t*) context;
     bool           changed = qd_delivery_disp_changed(delivery);
@@ -992,6 +1022,56 @@ static void router_disp_handler(void* co
 }
 
 
+typedef struct link_attach_t {
+    qd_router_t      *router;
+    qd_router_link_t *peer_link;
+    qd_link_t        *peer_qd_link;
+    char             *link_name;
+    qd_direction_t    dir;
+    qd_connection_t  *conn;
+} link_attach_t;
+
+ALLOC_DECLARE(link_attach_t);
+ALLOC_DEFINE(link_attach_t);
+
+
+static void qd_router_attach_routed_link(void *context, bool discard)
+{
+    link_attach_t *la = (link_attach_t*) context;
+
+    if (!discard) {
+        qd_link_t        *link  = qd_link(la->router->node, la->conn, la->dir, la->link_name);
+        qd_router_link_t *rlink = new_qd_router_link_t();
+        DEQ_ITEM_INIT(rlink);
+        rlink->link_type      = QD_LINK_ENDPOINT;
+        rlink->link_direction = la->dir;
+        rlink->owning_addr    = 0;
+        rlink->waypoint       = 0;
+        rlink->link           = link;
+        rlink->ref            = 0;
+        rlink->target         = 0;
+        DEQ_INIT(rlink->event_fifo);
+        DEQ_INIT(rlink->msg_fifo);
+        qd_link_set_context(link, rlink);
+
+        sys_mutex_lock(la->router->lock);
+        rlink->connected_link = la->peer_link;
+        la->peer_link->connected_link = rlink;
+        DEQ_INSERT_TAIL(la->router->links, rlink);
+        sys_mutex_unlock(la->router->lock);
+
+        pn_terminus_copy(qd_link_source(link), qd_link_remote_source(la->peer_qd_link));
+        pn_terminus_copy(qd_link_target(link), qd_link_remote_target(la->peer_qd_link));
+
+        pn_link_open(qd_link_pn(link));
+    }
+
+    if (la->link_name)
+        free(la->link_name);
+    free_link_attach_t(la);
+}
+
+
 /**
  * New Incoming Link Handler
  */
@@ -1003,7 +1083,8 @@ static int router_incoming_link_handler(
     const char  *r_tgt     = pn_terminus_get_address(qd_link_remote_target(link));
 
     if (is_router && !qd_router_connection_is_inter_router(qd_link_connection(link))) {
-        qd_log(router->log_source, QD_LOG_WARNING, "Incoming link claims router capability but is not on an inter-router connection");
+        qd_log(router->log_source, QD_LOG_WARNING,
+               "Incoming link claims router capability but is not on an inter-router connection");
         pn_link_close(pn_link);
         return 0;
     }
@@ -1032,6 +1113,56 @@ static int router_incoming_link_handler(
     rlink->mask_bit = is_router ? qd_router_find_mask_bit_LH(router, link) : 0;
     qd_entity_cache_add(QD_ROUTER_LINK_TYPE, rlink);
     DEQ_INSERT_TAIL(router->links, rlink);
+
+    //
+    // Lookup the target address to see if we can link-route this attach.
+    //
+    qd_address_t *addr = router_lookup_terminus_LH(router, r_tgt);
+    if (addr && !is_router) {
+        //
+        // This is a link-attach routable target.  Propagate the attach downrange.
+        // Check first for a locally connected container.
+        //
+        qd_router_lrp_ref_t *lrpref = DEQ_HEAD(addr->lrps);
+        if (lrpref) {
+            qd_connection_t *conn = lrpref->lrp->container->conn;
+            if (conn) {
+                link_attach_t *la = new_link_attach_t();
+                la->router       = router;
+                la->peer_link    = rlink;
+                la->peer_qd_link = link;
+                la->link_name    = strdup(pn_link_name(pn_link));
+                la->dir          = QD_OUTGOING;
+                la->conn         = conn;
+                qd_connection_invoke_deferred(conn, qd_router_attach_routed_link, la);
+            }
+        } else if (DEQ_SIZE(addr->rnodes) > 0) {
+            //
+            // There are no locally connected containers for this link but there is at
+            // least one on a remote router.  Forward the attach toward the remote destination.
+            //
+            qd_router_node_t *remote_router = DEQ_HEAD(addr->rnodes)->router;
+            qd_router_link_t *out_link;
+            if (remote_router)
+                out_link = remote_router->peer_link;
+            if (!out_link && remote_router && remote_router->next_hop)
+                out_link = remote_router->next_hop->peer_link;
+            if (out_link) {
+                qd_connection_t *out_conn = qd_link_connection(out_link->link);
+                if (out_conn) {
+                    link_attach_t *la = new_link_attach_t();
+                    la->router       = router;
+                    la->peer_link    = rlink;
+                    la->peer_qd_link = link;
+                    la->link_name    = strdup(pn_link_name(pn_link));
+                    la->dir          = QD_OUTGOING;
+                    la->conn         = out_conn;
+                    qd_connection_invoke_deferred(out_conn, qd_router_attach_routed_link, la);
+                }
+            }
+        }
+    }
+
     sys_mutex_unlock(router->lock);
 
     pn_terminus_copy(qd_link_source(link), qd_link_remote_source(link));
@@ -1039,11 +1170,6 @@ static int router_incoming_link_handler(
     pn_link_flow(pn_link, 1000);
     pn_link_open(pn_link);
 
-    //
-    // TODO - If the address has link-route semantics, create all associated
-    //        links needed to go with this one.
-    //
-
     return 0;
 }
 
@@ -1065,7 +1191,8 @@ static int router_outgoing_link_handler(
     qd_address_t           *addr = 0;
 
     if (is_router && !qd_router_connection_is_inter_router(qd_link_connection(link))) {
-        qd_log(router->log_source, QD_LOG_WARNING, "Outgoing link claims router capability but is not on an inter-router connection");
+        qd_log(router->log_source, QD_LOG_WARNING,
+               "Outgoing link claims router capability but is not on an inter-router connection");
         pn_link_close(pn_link);
         return 0;
     }
@@ -1092,7 +1219,8 @@ static int router_outgoing_link_handler(
         if (prefix != 'M') {
             qd_field_iterator_free(iter);
             pn_link_close(pn_link);
-            qd_log(router->log_source, QD_LOG_WARNING, "Rejected an outgoing endpoint link with a router address: %s", r_src);
+            qd_log(router->log_source, QD_LOG_WARNING,
+                   "Rejected an outgoing endpoint link with a router address: %s", r_src);
             return 0;
         }
     }
@@ -1244,7 +1372,8 @@ static int router_link_detach_handler(vo
         if (router->out_links_by_mask_bit[rlink->mask_bit] == rlink)
             router->out_links_by_mask_bit[rlink->mask_bit] = 0;
         else
-            qd_log(router->log_source, QD_LOG_CRITICAL, "Outgoing router link closing but not in index: bit=%d", rlink->mask_bit);
+            qd_log(router->log_source, QD_LOG_CRITICAL,
+                   "Outgoing router link closing but not in index: bit=%d", rlink->mask_bit);
     }
 
     //
@@ -1322,7 +1451,8 @@ static void router_outbound_open_handler
     receiver = qd_link(router->node, conn, QD_INCOMING, QD_INTERNODE_LINK_NAME_1);
     // TODO - We don't want to have to cast away the constness of the literal string here!
     //        See PROTON-429
-    pn_data_put_symbol(pn_terminus_capabilities(qd_link_target(receiver)), pn_bytes(clen, (char*) QD_CAPABILITY_ROUTER));
+    pn_data_put_symbol(pn_terminus_capabilities(qd_link_target(receiver)),
+                       pn_bytes(clen, (char*) QD_CAPABILITY_ROUTER));
 
     rlink = new_qd_router_link_t();
     DEQ_ITEM_INIT(rlink);
@@ -1348,7 +1478,8 @@ static void router_outbound_open_handler
     sender = qd_link(router->node, conn, QD_OUTGOING, QD_INTERNODE_LINK_NAME_2);
     // TODO - We don't want to have to cast away the constness of the literal string here!
     //        See PROTON-429
-    pn_data_put_symbol(pn_terminus_capabilities(qd_link_source(sender)), pn_bytes(clen, (char *) QD_CAPABILITY_ROUTER));
+    pn_data_put_symbol(pn_terminus_capabilities(qd_link_source(sender)),
+                       pn_bytes(clen, (char *) QD_CAPABILITY_ROUTER));
 
     rlink = new_qd_router_link_t();
     DEQ_ITEM_INIT(rlink);
@@ -1400,7 +1531,7 @@ static void qd_router_timer_handler(void
 
 static qd_node_type_t router_node = {"router", 0, 0,
                                      router_rx_handler,
-                                     router_disp_handler,
+                                     router_disposition_handler,
                                      router_incoming_link_handler,
                                      router_outgoing_link_handler,
                                      router_writable_link_handler,

Modified: qpid/dispatch/trunk/src/server.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/server.c?rev=1656233&r1=1656232&r2=1656233&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/server.c (original)
+++ qpid/dispatch/trunk/src/server.c Sat Jan 31 18:34:12 2015
@@ -36,6 +36,7 @@ static __thread qd_server_t *thread_serv
 ALLOC_DEFINE(qd_work_item_t);
 ALLOC_DEFINE(qd_listener_t);
 ALLOC_DEFINE(qd_connector_t);
+ALLOC_DEFINE(qd_deferred_call_t);
 ALLOC_DEFINE(qd_connection_t);
 ALLOC_DEFINE(qd_user_fd_t);
 
@@ -103,10 +104,10 @@ qd_error_t qd_entity_refresh_connection(
 
 static void thread_process_listeners(qd_server_t *qd_server)
 {
-    qdpn_driver_t     *driver   = qd_server->driver;
-    qdpn_listener_t   *listener;
-    qdpn_connector_t  *cxtr;
-    qd_connection_t *ctx;
+    qdpn_driver_t    *driver = qd_server->driver;
+    qdpn_listener_t  *listener;
+    qdpn_connector_t *cxtr;
+    qd_connection_t  *ctx;
 
     for (listener = qdpn_driver_listener(driver); listener; listener = qdpn_driver_listener(driver)) {
         cxtr = qdpn_listener_accept(listener);
@@ -127,6 +128,8 @@ static void thread_process_listeners(qd_
         ctx->user_context = 0;
         ctx->link_context = 0;
         ctx->ufd          = 0;
+        DEQ_INIT(ctx->deferred_calls);
+        ctx->deferred_call_lock = sys_mutex();
 
         size_t clen = strlen(QD_CAPABILITY_ANONYMOUS_RELAY);
         pn_connection_t *conn = pn_connection();
@@ -229,6 +232,37 @@ static void block_if_paused_LH(qd_server
 }
 
 
+static void invoke_deferred_calls(qd_connection_t *conn, bool discard)
+{
+    qd_deferred_call_list_t  calls;
+    qd_deferred_call_t      *dc;
+
+    //
+    // Copy the deferred calls out of the connection under lock.
+    //
+    DEQ_INIT(calls);
+    sys_mutex_lock(conn->deferred_call_lock);
+    dc = DEQ_HEAD(conn->deferred_calls);
+    while (dc) {
+        DEQ_REMOVE_HEAD(conn->deferred_calls);
+        DEQ_INSERT_TAIL(calls, dc);
+        dc = DEQ_HEAD(conn->deferred_calls);
+    }
+    sys_mutex_unlock(conn->deferred_call_lock);
+
+    //
+    // Invoke the calls outside of the critical section.
+    //
+    dc = DEQ_HEAD(calls);
+    while (dc) {
+        DEQ_REMOVE_HEAD(calls);
+        dc->call(dc->context, discard);
+        free_qd_deferred_call_t(dc);
+        dc = DEQ_HEAD(calls);
+    }
+}
+
+
 static int process_connector(qd_server_t *qd_server, qdpn_connector_t *cxtr)
 {
     qd_connection_t *ctx = qdpn_connector_context(cxtr);
@@ -314,10 +348,12 @@ static int process_connector(qd_server_t
                                         (qd_connection_t*) qdpn_connector_context(cxtr));
                 events = 0;
             }
-            else
+            else {
+                invoke_deferred_calls(ctx, false);
                 events = qd_server->conn_handler(qd_server->conn_handler_context, ctx->context,
                                                  QD_CONN_EVENT_PROCESS,
                                                  (qd_connection_t*) qdpn_connector_context(cxtr));
+            }
             break;
 
         default:
@@ -568,6 +604,8 @@ static void *thread_run(void *arg)
                     pn_connection_free(conn);
                 if (ctx->collector)
                     pn_collector_free(ctx->collector);
+                invoke_deferred_calls(ctx, true);  // Discard any pending deferred calls
+                sys_mutex_free(ctx->deferred_call_lock);
                 free_qd_connection_t(ctx);
                 qd_server->threads_active--;
                 sys_mutex_unlock(qd_server->lock);
@@ -655,6 +693,8 @@ static void cxtr_try_open(void *context)
     ctx->user_context = 0;
     ctx->link_context = 0;
     ctx->ufd          = 0;
+    DEQ_INIT(ctx->deferred_calls);
+    ctx->deferred_call_lock = sys_mutex();
 
     //
     // qdpn_connector is not thread safe
@@ -995,6 +1035,21 @@ const qd_server_config_t *qd_connection_
 }
 
 
+void qd_connection_invoke_deferred(qd_connection_t *conn, qd_deferred_t call, void *context)
+{
+    qd_deferred_call_t *dc = new_qd_deferred_call_t();
+    DEQ_ITEM_INIT(dc);
+    dc->call    = call;
+    dc->context = context;
+
+    sys_mutex_lock(conn->deferred_call_lock);
+    DEQ_INSERT_TAIL(conn->deferred_calls, dc);
+    sys_mutex_unlock(conn->deferred_call_lock);
+
+    qd_server_activate(conn);
+}
+
+
 qd_listener_t *qd_server_listen(qd_dispatch_t *qd, const qd_server_config_t *config, void *context)
 {
     qd_server_t   *qd_server = qd->server;
@@ -1097,6 +1152,8 @@ qd_user_fd_t *qd_user_fd(qd_dispatch_t *
     ctx->user_context = 0;
     ctx->link_context = 0;
     ctx->ufd          = ufd;
+    DEQ_INIT(ctx->deferred_calls);
+    ctx->deferred_call_lock = sys_mutex();
 
     ufd->context = context;
     ufd->server  = qd_server;

Modified: qpid/dispatch/trunk/src/server_private.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/server_private.h?rev=1656233&r1=1656232&r2=1656233&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/server_private.h (original)
+++ qpid/dispatch/trunk/src/server_private.h Sat Jan 31 18:34:12 2015
@@ -78,6 +78,15 @@ struct qd_connector_t {
     long                      delay;
 };
 
+
+typedef struct qd_deferred_call_t {
+    DEQ_LINKS(struct qd_deferred_call_t);
+    qd_deferred_t  call;
+    void          *context;
+} qd_deferred_call_t;
+
+DEQ_DECLARE(qd_deferred_call_t, qd_deferred_call_list_t);
+
 /**
  * Connection objects wrap Proton connection objects.
  */
@@ -96,6 +105,9 @@ struct qd_connection_t {
     void             *user_context;
     void             *link_context; // Context shared by this connection's links
     qd_user_fd_t     *ufd;
+
+    qd_deferred_call_list_t  deferred_calls;
+    sys_mutex_t             *deferred_call_lock;
 };
 
 DEQ_DECLARE(qd_connection_t, qd_connection_list_t);
@@ -157,6 +169,7 @@ struct qd_server_t {
 
 ALLOC_DECLARE(qd_work_item_t);
 ALLOC_DECLARE(qd_listener_t);
+ALLOC_DECLARE(qd_deferred_call_t);
 ALLOC_DECLARE(qd_connector_t);
 ALLOC_DECLARE(qd_connection_t);
 ALLOC_DECLARE(qd_user_fd_t);

Modified: qpid/dispatch/trunk/tests/config-2-broker/A.conf
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/config-2-broker/A.conf?rev=1656233&r1=1656232&r2=1656233&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/config-2-broker/A.conf (original)
+++ qpid/dispatch/trunk/tests/config-2-broker/A.conf Sat Jan 31 18:34:12 2015
@@ -73,12 +73,12 @@ connector {
 }
 
 linkRoutePattern {
-    prefix: queue1/
+    prefix: queue.
     connector: broker
 }
 
 linkRoutePattern {
-    prefix: queue2/
+    prefix: exchange.
     connector: broker
 }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org