You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2015/12/22 17:46:17 UTC

[1/2] qpid-dispatch git commit: DISPATCH-179 - WIP - Formalized the notion of in-process subscription in preparation for implementing message delivery through the router core.

Repository: qpid-dispatch
Updated Branches:
  refs/heads/tross-DISPATCH-179-1 d218eeeaa -> 8be2a0a92


DISPATCH-179 - WIP - Formalized the notion of in-process subscription in preparation for
                     implementing message delivery through the router core.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/363baaec
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/363baaec
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/363baaec

Branch: refs/heads/tross-DISPATCH-179-1
Commit: 363baaec0dbd9ded69bf33498151c2c777fc1c1c
Parents: d218eee
Author: Ted Ross <tr...@redhat.com>
Authored: Mon Dec 21 11:15:11 2015 -0500
Committer: Ted Ross <tr...@redhat.com>
Committed: Mon Dec 21 11:15:11 2015 -0500

----------------------------------------------------------------------
 include/qpid/dispatch/router_core.h           |  39 ++++--
 python/qpid_dispatch/management/qdrouter.json |   2 +-
 src/router_core/agent_address.c               |   2 +-
 src/router_core/connections.c                 |   4 +-
 src/router_core/route_tables.c                | 131 +++++++++------------
 src/router_core/router_core_private.h         |  44 ++++---
 tools/qdstat                                  |   2 +-
 7 files changed, 116 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/363baaec/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 9616b6b..39d8933 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -32,12 +32,13 @@
  * exclusive access to that connection.
  */
 
-typedef struct qdr_core_t       qdr_core_t;
-typedef struct qdr_connection_t qdr_connection_t;
-typedef struct qdr_link_t       qdr_link_t;
-typedef struct qdr_delivery_t   qdr_delivery_t;
-typedef struct qdr_terminus_t   qdr_terminus_t;
-typedef struct qdr_error_t      qdr_error_t;
+typedef struct qdr_core_t         qdr_core_t;
+typedef struct qdr_connection_t   qdr_connection_t;
+typedef struct qdr_link_t         qdr_link_t;
+typedef struct qdr_delivery_t     qdr_delivery_t;
+typedef struct qdr_terminus_t     qdr_terminus_t;
+typedef struct qdr_error_t        qdr_error_t;
+typedef struct qdr_subscription_t qdr_subscription_t;
 
 /**
  * Allocate and start an instance of the router core module.
@@ -76,13 +77,20 @@ void qdr_core_route_table_handlers(qdr_core_t           *core,
 
 /**
  ******************************************************************************
- * In-process message-receiver functions
+ * In-process messaging functions
  ******************************************************************************
  */
 typedef void (*qdr_receive_t) (void *context, qd_message_t *msg, int link_maskbit);
 
-void qdr_core_subscribe(qdr_core_t *core, const char *address, char aclass, char phase,
-                        qd_address_semantics_t sem, qdr_receive_t on_message, void *context);
+qdr_subscription_t *qdr_core_subscribe(qdr_core_t             *core,
+                                       const char             *address,
+                                       char                    aclass,
+                                       char                    phase,
+                                       qd_address_semantics_t  semantics,
+                                       qdr_receive_t           on_message,
+                                       void                   *context);
+
+void qdr_core_unsubscribe(qdr_subscription_t *sub);
 
 
 /**
@@ -392,6 +400,19 @@ void qdr_link_detach(qdr_link_t *link, qd_detach_type_t dt, qdr_error_t *error);
 qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, pn_delivery_t *delivery, qd_message_t *msg);
 qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, pn_delivery_t *delivery, qd_message_t *msg, qd_field_iterator_t *addr);
 
+/**
+ * qdr_send_to
+ *
+ * Send a message to a destination.  This function is used only by in-process components that
+ * create messages to be sent.  For these messages, there is no inbound link or delivery.
+ *
+ * @param core Pointer to the core module
+ * @param msg Pointer to the message to be sent.  The message will be copied during the call
+ *            can must be freed by the caller if the caller doesn't need to hold it for later use.
+ * @param exclude_inprocess If true, the message will not be sent to in-process subscribers.
+ */
+void qdr_send_to(qdr_core_t *core, qd_message_t *msg, const char *addr, bool exclude_inprocess);
+
 typedef void (*qdr_link_first_attach_t)  (void *context, qdr_connection_t *conn, qdr_link_t *link, 
                                           qdr_terminus_t *source, qdr_terminus_t *target);
 typedef void (*qdr_link_second_attach_t) (void *context, qdr_link_t *link, qdr_terminus_t *source, qdr_terminus_t *target);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/363baaec/python/qpid_dispatch/management/qdrouter.json
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json
index 41b628a..dd54457 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -846,7 +846,7 @@
             "description": "AMQP address managed by the router.",
             "extends": "operationalEntity",
             "attributes": {
-                "inProcess": {"type": "boolean"},
+                "inProcess": {"type": "integer"},
                 "subscriberCount": {"type": "integer", "graph": true},
                 "remoteCount": {"type": "integer", "graph": true},
                 "deliveriesIngress": {"type": "integer", "graph": true},

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/363baaec/src/router_core/agent_address.c
----------------------------------------------------------------------
diff --git a/src/router_core/agent_address.c b/src/router_core/agent_address.c
index ef8155b..e8e82ac 100644
--- a/src/router_core/agent_address.c
+++ b/src/router_core/agent_address.c
@@ -54,7 +54,7 @@ static void qdr_insert_address_columns_CT(qdr_address_t        *addr,
             break;
 
         case QDR_ADDRESS_IN_PROCESS:
-            qd_compose_insert_bool(body, addr->on_message != 0);
+            qd_compose_insert_uint(body, DEQ_SIZE(addr->subscriptions));
             break;
 
         case QDR_ADDRESS_SUBSCRIBER_COUNT:

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/363baaec/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index db469f6..bb6aad9 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -387,7 +387,7 @@ static qd_address_semantics_t qdr_semantics_for_address(qdr_core_t *core, qd_fie
  * Depending on its policy, the address may be eligible for being closed out
  * (i.e. Logging its terminal statistics and freeing its resources).
  */
-static void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t *addr, bool was_local)
+void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t *addr, bool was_local)
 {
     if (addr == 0)
         return;
@@ -407,7 +407,7 @@ static void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t *addr, bool was_lo
     // If the address has no in-process consumer or destinations, it should be
     // deleted.
     //
-    if (addr->on_message == 0 && DEQ_SIZE(addr->rlinks) == 0 && DEQ_SIZE(addr->inlinks) == 0 &&
+    if (DEQ_SIZE(addr->subscriptions) == 0 && DEQ_SIZE(addr->rlinks) == 0 && DEQ_SIZE(addr->inlinks) == 0 &&
         DEQ_SIZE(addr->rnodes) == 0 && !addr->waypoint && !addr->block_deletion) {
         qd_hash_remove_by_handle(core->addr_hash, addr->hash_handle);
         DEQ_REMOVE(core->addrs, addr);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/363baaec/src/router_core/route_tables.c
----------------------------------------------------------------------
diff --git a/src/router_core/route_tables.c b/src/router_core/route_tables.c
index 5da26fa..1007e8c 100644
--- a/src/router_core/route_tables.c
+++ b/src/router_core/route_tables.c
@@ -30,6 +30,7 @@ static void qdr_set_valid_origins_CT (qdr_core_t *core, qdr_action_t *action, bo
 static void qdr_map_destination_CT   (qdr_core_t *core, qdr_action_t *action, bool discard);
 static void qdr_unmap_destination_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
 static void qdr_subscribe_CT         (qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdr_unsubscribe_CT       (qdr_core_t *core, qdr_action_t *action, bool discard);
 
 static qd_address_semantics_t router_addr_semantics = QD_FANOUT_SINGLE | QD_BIAS_CLOSEST | QD_CONGESTION_DROP | QD_DROP_FOR_SLOW_CONSUMERS | QD_BYPASS_VALID_ORIGINS;
 
@@ -128,17 +129,39 @@ void qdr_core_route_table_handlers(qdr_core_t           *core,
 }
 
 
-void qdr_core_subscribe(qdr_core_t *core, const char *address, char aclass, char phase,
-                        qd_address_semantics_t sem, qdr_receive_t on_message, void *context)
+qdr_subscription_t *qdr_core_subscribe(qdr_core_t             *core,
+                                       const char             *address,
+                                       char                    aclass,
+                                       char                    phase,
+                                       qd_address_semantics_t  semantics,
+                                       qdr_receive_t           on_message,
+                                       void                   *context)
 {
+    qdr_subscription_t *sub = NEW(qdr_subscription_t);
+    sub->core               = core;
+    sub->addr               = 0;
+    sub->on_message         = on_message;
+    sub->on_message_context = context;
+
     qdr_action_t *action = qdr_action(qdr_subscribe_CT, "subscribe");
-    action->args.subscribe.address        = qdr_field(address);
-    action->args.subscribe.semantics      = sem;
-    action->args.subscribe.address_class  = aclass;
-    action->args.subscribe.address_phase  = phase;
-    action->args.subscribe.on_message     = on_message;
-    action->args.subscribe.context        = context;
+    action->args.io.address       = qdr_field(address);
+    action->args.io.address_class = aclass;
+    action->args.io.address_phase = phase;
+    action->args.io.subscription  = sub;
+    action->args.io.semantics     = semantics;
     qdr_action_enqueue(core, action);
+
+    return sub;
+}
+
+
+void qdr_core_unsubscribe(qdr_subscription_t *sub)
+{
+    if (sub) {
+        qdr_action_t *action = qdr_action(qdr_unsubscribe_CT, "unsubscribe");
+        action->args.io.subscription = sub;
+        qdr_action_enqueue(sub->core, action);
+    }
 }
 
 
@@ -169,64 +192,6 @@ void qdr_route_table_setup_CT(qdr_core_t *core)
 }
 
 
-/**
- * Check an address to see if it no longer has any associated destinations.
- * Depending on its policy, the address may be eligible for being closed out
- * (i.e. Logging its terminal statistics and freeing its resources).
- */
-static void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t *addr, bool was_local)
-{
-    if (addr == 0)
-        return;
-
-    bool         to_delete      = false;
-    bool         no_more_locals = false;
-    qdr_field_t *key_field      = 0;
-
-    //
-    // If the address has no in-process consumer or destinations, it should be
-    // deleted.
-    //
-    if (addr->on_message == 0 &&
-        DEQ_SIZE(addr->rlinks) == 0 && DEQ_SIZE(addr->rnodes) == 0 &&
-        !addr->waypoint && !addr->block_deletion)
-        to_delete = true;
-
-    //
-    // If we have just removed a local linkage and it was the last local linkage,
-    // we need to notify the router module that there is no longer a local
-    // presence of this address.
-    //
-    if (was_local && DEQ_SIZE(addr->rlinks) == 0) {
-        no_more_locals = true;
-        const unsigned char *key = qd_hash_key_by_handle(addr->hash_handle);
-        if (key && (key[0] == 'M' || key[0] == 'C' || key[0] == 'D'))
-            key_field = qdr_field((const char*) key);
-    }
-
-    if (to_delete) {
-        //
-        // Delete the address but grab the hash key so we can use it outside the
-        // critical section.
-        //
-        qd_hash_remove_by_handle(core->addr_hash, addr->hash_handle);
-        DEQ_REMOVE(core->addrs, addr);
-        qd_hash_handle_free(addr->hash_handle);
-        free_qdr_address_t(addr);
-    }
-
-    //
-    // If the address is mobile-class and it was just removed from a local link,
-    // tell the router module that it is no longer attached locally.
-    //
-    if (no_more_locals && key_field) {
-        //
-        // TODO - Defer-call mobile-removed
-        //
-    }
-}
-
-
 static void qdr_add_router_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
 {
     int          router_maskbit = action->args.route_table.router_maskbit;
@@ -585,11 +550,12 @@ static void qdr_unmap_destination_CT(qdr_core_t *core, qdr_action_t *action, boo
 
 static void qdr_subscribe_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
 {
-    qdr_field_t *address = action->args.subscribe.address;
+    qdr_field_t        *address = action->args.io.address;
+    qdr_subscription_t *sub     = action->args.io.subscription;
 
     if (!discard) {
-        char aclass         = action->args.subscribe.address_class;
-        char phase          = action->args.subscribe.address_phase;
+        char aclass         = action->args.io.address_class;
+        char phase          = action->args.io.address_phase;
         qdr_address_t *addr = 0;
 
         qd_address_iterator_override_prefix(address->iterator, aclass);
@@ -599,24 +565,35 @@ static void qdr_subscribe_CT(qdr_core_t *core, qdr_action_t *action, bool discar
 
         qd_hash_retrieve(core->addr_hash, address->iterator, (void**) &addr);
         if (!addr) {
-            addr = qdr_address(action->args.subscribe.semantics);
+            addr = qdr_address(action->args.io.semantics);
             qd_hash_insert(core->addr_hash, address->iterator, addr, &addr->hash_handle);
             DEQ_ITEM_INIT(addr);
             DEQ_INSERT_TAIL(core->addrs, addr);
         }
 
-        if (!addr->on_message) {
-            addr->on_message         = action->args.subscribe.on_message;
-            addr->on_message_context = action->args.subscribe.context;
-        } else
-            qd_log(core->log, QD_LOG_CRITICAL,
-                   "qdr_core_subscribe: Multiple in-process subscriptions on the same address");
-    }
+        sub->addr = addr;
+        DEQ_ITEM_INIT(sub);
+        DEQ_INSERT_TAIL(addr->subscriptions, sub);
+    } else
+        free(sub);
 
     qdr_field_free(address);
 }
 
 
+static void qdr_unsubscribe_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+{
+    qdr_subscription_t *sub = action->args.io.subscription;
+
+    if (!discard) {
+        DEQ_REMOVE(sub->addr->subscriptions, sub);
+        sub->addr = 0;
+        qdr_check_addr_CT(sub->core, sub->addr, false);
+    }
+
+    free(sub);
+}
+
 //==================================================================================
 // Call-back Functions
 //==================================================================================

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/363baaec/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 0ee1604..1cc5cf2 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -76,16 +76,16 @@ struct qdr_action_t {
         } connection;
 
         //
-        // Arguments for in-process subscriptions
+        // Arguments for in-process messaging
         //
         struct {
             qdr_field_t            *address;
-            qd_address_semantics_t  semantics;
             char                    address_class;
             char                    address_phase;
-            qdr_receive_t           on_message;
-            void                   *context;
-        } subscribe;
+            qd_address_semantics_t  semantics;
+            qdr_subscription_t     *subscription;
+            qd_message_t           *message;
+        } io;
 
         //
         // Arguments for management-agent actions
@@ -204,21 +204,30 @@ struct qdr_lrp_ref_t {
 ALLOC_DECLARE(qdr_lrp_ref_t);
 DEQ_DECLARE(qdr_lrp_ref_t, qdr_lrp_ref_list_t);
 
+struct qdr_subscription_t {
+    DEQ_LINKS(qdr_subscription_t);
+    qdr_core_t    *core;
+    qdr_address_t *addr;
+    qdr_receive_t  on_message;
+    void          *on_message_context;
+};
+
+DEQ_DECLARE(qdr_subscription_t, qdr_subscription_list_t);
+
 
 struct qdr_address_t {
     DEQ_LINKS(qdr_address_t);
-    qd_router_message_cb_t     on_message;          ///< In-Process Message Consumer
-    void                      *on_message_context;  ///< In-Process Consumer context
-    qdr_lrp_ref_list_t         lrps;                ///< Local link-route destinations
-    qdr_link_ref_list_t        rlinks;              ///< Locally-Connected Consumers
-    qdr_link_ref_list_t        inlinks;             ///< Locally-Connected Producers
-    qdr_router_ref_list_t      rnodes;              ///< Remotely-Connected Consumers
-    qd_hash_handle_t          *hash_handle;         ///< Linkage back to the hash table entry
-    qd_address_semantics_t     semantics;
-    bool                       toggle;
-    bool                       waypoint;
-    bool                       block_deletion;
-    qd_router_forwarder_t     *forwarder;
+    qdr_subscription_list_t  subscriptions; ///< In-process message subscribers
+    qdr_lrp_ref_list_t       lrps;          ///< Local link-route destinations
+    qdr_link_ref_list_t      rlinks;        ///< Locally-Connected Consumers
+    qdr_link_ref_list_t      inlinks;       ///< Locally-Connected Producers
+    qdr_router_ref_list_t    rnodes;        ///< Remotely-Connected Consumers
+    qd_hash_handle_t        *hash_handle;   ///< Linkage back to the hash table entry
+    qd_address_semantics_t   semantics;
+    bool                     toggle;
+    bool                     waypoint;
+    bool                     block_deletion;
+    qd_router_forwarder_t   *forwarder;
 
     /**@name Statistics */
     ///@{
@@ -384,6 +393,7 @@ void qdr_post_mobile_removed_CT(qdr_core_t *core, const char *address_hash);
 void qdr_post_link_lost_CT(qdr_core_t *core, int link_maskbit);
 
 void qdr_post_general_work_CT(qdr_core_t *core, qdr_general_work_t *work);
+void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t *addr, bool was_local);
 
 qdr_query_t *qdr_query(qdr_core_t              *core,
                        void                    *context,

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/363baaec/tools/qdstat
----------------------------------------------------------------------
diff --git a/tools/qdstat b/tools/qdstat
index bdca23a..ce1e850 100755
--- a/tools/qdstat
+++ b/tools/qdstat
@@ -269,7 +269,7 @@ class BusManager(Node):
         heads.append(Header("class"))
         heads.append(Header("address"))
         heads.append(Header("phase"))
-        heads.append(Header("in-proc", Header.Y))
+        heads.append(Header("in-proc", Header.COMMAS))
         heads.append(Header("local", Header.COMMAS))
         heads.append(Header("remote", Header.COMMAS))
         heads.append(Header("in", Header.COMMAS))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/2] qpid-dispatch git commit: DISPATCH-179 - Introduced 'T'-class addresses to replace the old "is-local" condition. T-class addresses are topological addresses that match "all" routers in an area or in all areas.

Posted by tr...@apache.org.
DISPATCH-179 - Introduced 'T'-class addresses to replace the old "is-local" condition.
               T-class addresses are topological addresses that match "all" routers in
               an area or in all areas.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/8be2a0a9
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/8be2a0a9
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/8be2a0a9

Branch: refs/heads/tross-DISPATCH-179-1
Commit: 8be2a0a9294b1fead41c258262096c43956fb5b2
Parents: 363baae
Author: Ted Ross <tr...@redhat.com>
Authored: Tue Dec 22 10:14:02 2015 -0500
Committer: Ted Ross <tr...@redhat.com>
Committed: Tue Dec 22 10:14:02 2015 -0500

----------------------------------------------------------------------
 src/iterator.c             |  7 +++-
 src/router_core/transfer.c | 84 +++++++++++++++++++++++++++++++++++++++++
 tests/field_test.c         |  8 ++--
 3 files changed, 94 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/8be2a0a9/src/iterator.c
----------------------------------------------------------------------
diff --git a/src/iterator.c b/src/iterator.c
index 7a8f0ab..686447d 100644
--- a/src/iterator.c
+++ b/src/iterator.c
@@ -110,7 +110,12 @@ static void parse_address_view(qd_field_iterator_t *iter)
 
         if (qd_field_iterator_prefix(iter, "topo/")) {
             if (qd_field_iterator_prefix(iter, "all/") || qd_field_iterator_prefix(iter, my_area)) {
-                if (qd_field_iterator_prefix(iter, "all/") || qd_field_iterator_prefix(iter, my_router)) {
+                if (qd_field_iterator_prefix(iter, "all/")) {
+                    iter->prefix      = 'T';
+                    iter->state       = STATE_AT_PREFIX;
+                    iter->view_prefix = true;
+                    return;
+                } else if (qd_field_iterator_prefix(iter, my_router)) {
                     iter->prefix      = 'L';
                     iter->state       = STATE_AT_PREFIX;
                     iter->view_prefix = true;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/8be2a0a9/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
new file mode 100644
index 0000000..d8a05ee
--- /dev/null
+++ b/src/router_core/transfer.c
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "router_core_private.h"
+#include <qpid/dispatch/amqp.h>
+#include <stdio.h>
+
+static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdr_link_deliver_to_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdr_send_to_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
+
+
+//==================================================================================
+// Internal Functions
+//==================================================================================
+
+//==================================================================================
+// Interface Functions
+//==================================================================================
+
+qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, pn_delivery_t *delivery, qd_message_t *msg)
+{
+    qdr_action_t *action = qdr_action(qdr_link_delivery_CT, "link_delivery");
+
+    qdr_action_enqueue(core, action);
+}
+
+
+qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, pn_delivery_t *delivery, qd_message_t *msg, qd_field_iterator_t *addr)
+{
+    qdr_action_t *action = qdr_action(qdr_link_delivery_to_CT, "link_delivery_to");
+
+    qdr_action_enqueue(core, action);
+}
+
+
+void qdr_send_to(qdr_core_t *core, qd_message_t *msg, const char *addr, bool exclude_inprocess)
+{
+    qdr_action_t *action = qdr_action(qdr_send_to_CT, "send_to");
+
+    qdr_action_enqueue(core, action);
+}
+
+
+//==================================================================================
+// In-Thread Functions
+//==================================================================================
+
+static void qdr_link_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+{
+    if (discard)
+        return;
+}
+
+
+static void qdr_link_delivery_to_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+{
+    if (discard)
+        return;
+}
+
+
+static void qdr_send_to_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+{
+    if (discard)
+        return;
+}
+

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/8be2a0a9/tests/field_test.c
----------------------------------------------------------------------
diff --git a/tests/field_test.c b/tests/field_test.c
index a093505..e1ec6c6 100644
--- a/tests/field_test.c
+++ b/tests/field_test.c
@@ -159,8 +159,8 @@ static char* test_view_address_hash(void *context)
     {"amqp:/_topo/my-area/router/local/sub",    "Rrouter"},
     {"amqp:/_topo/my-area/my-router/local/sub", "Llocal/sub"},
     {"amqp:/_topo/area/all/local/sub",          "Aarea"},
-    {"amqp:/_topo/my-area/all/local/sub",       "Llocal/sub"},
-    {"amqp:/_topo/all/all/local/sub",           "Llocal/sub"},
+    {"amqp:/_topo/my-area/all/local/sub",       "Tlocal/sub"},
+    {"amqp:/_topo/all/all/local/sub",           "Tlocal/sub"},
     {"amqp://host:port/_local/my-addr",         "Lmy-addr"},
     {"_topo/area/router/my-addr",               "Aarea"},
     {"_topo/my-area/router/my-addr",            "Rrouter"},
@@ -175,8 +175,8 @@ static char* test_view_address_hash(void *context)
     {"amqp:/_topo/my-area/router/local/sub.",    "Rrouter"},
     {"amqp:/_topo/my-area/my-router/local/sub.", "Llocal/sub"},
     {"amqp:/_topo/area/all/local/sub.",          "Aarea"},
-    {"amqp:/_topo/my-area/all/local/sub.",       "Llocal/sub"},
-    {"amqp:/_topo/all/all/local/sub.",           "Llocal/sub"},
+    {"amqp:/_topo/my-area/all/local/sub.",       "Tlocal/sub"},
+    {"amqp:/_topo/all/all/local/sub.",           "Tlocal/sub"},
     {"amqp://host:port/_local/my-addr.",         "Lmy-addr"},
     {"_topo/area/router/my-addr.",               "Aarea"},
     {"_topo/my-area/router/my-addr.",            "Rrouter"},


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org