You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2015/12/22 17:46:17 UTC
[1/2] qpid-dispatch git commit: DISPATCH-179 - WIP - Formalized the
notion of in-process subscription in preparation for implementing message
delivery through the router core.
Repository: qpid-dispatch
Updated Branches:
refs/heads/tross-DISPATCH-179-1 d218eeeaa -> 8be2a0a92
DISPATCH-179 - WIP - Formalized the notion of in-process subscription in preparation for
implementing message delivery through the router core.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/363baaec
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/363baaec
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/363baaec
Branch: refs/heads/tross-DISPATCH-179-1
Commit: 363baaec0dbd9ded69bf33498151c2c777fc1c1c
Parents: d218eee
Author: Ted Ross <tr...@redhat.com>
Authored: Mon Dec 21 11:15:11 2015 -0500
Committer: Ted Ross <tr...@redhat.com>
Committed: Mon Dec 21 11:15:11 2015 -0500
----------------------------------------------------------------------
include/qpid/dispatch/router_core.h | 39 ++++--
python/qpid_dispatch/management/qdrouter.json | 2 +-
src/router_core/agent_address.c | 2 +-
src/router_core/connections.c | 4 +-
src/router_core/route_tables.c | 131 +++++++++------------
src/router_core/router_core_private.h | 44 ++++---
tools/qdstat | 2 +-
7 files changed, 116 insertions(+), 108 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/363baaec/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 9616b6b..39d8933 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -32,12 +32,13 @@
* exclusive access to that connection.
*/
-typedef struct qdr_core_t qdr_core_t;
-typedef struct qdr_connection_t qdr_connection_t;
-typedef struct qdr_link_t qdr_link_t;
-typedef struct qdr_delivery_t qdr_delivery_t;
-typedef struct qdr_terminus_t qdr_terminus_t;
-typedef struct qdr_error_t qdr_error_t;
+typedef struct qdr_core_t qdr_core_t;
+typedef struct qdr_connection_t qdr_connection_t;
+typedef struct qdr_link_t qdr_link_t;
+typedef struct qdr_delivery_t qdr_delivery_t;
+typedef struct qdr_terminus_t qdr_terminus_t;
+typedef struct qdr_error_t qdr_error_t;
+typedef struct qdr_subscription_t qdr_subscription_t;
/**
* Allocate and start an instance of the router core module.
@@ -76,13 +77,20 @@ void qdr_core_route_table_handlers(qdr_core_t *core,
/**
******************************************************************************
- * In-process message-receiver functions
+ * In-process messaging functions
******************************************************************************
*/
typedef void (*qdr_receive_t) (void *context, qd_message_t *msg, int link_maskbit);
-void qdr_core_subscribe(qdr_core_t *core, const char *address, char aclass, char phase,
- qd_address_semantics_t sem, qdr_receive_t on_message, void *context);
+qdr_subscription_t *qdr_core_subscribe(qdr_core_t *core,
+ const char *address,
+ char aclass,
+ char phase,
+ qd_address_semantics_t semantics,
+ qdr_receive_t on_message,
+ void *context);
+
+void qdr_core_unsubscribe(qdr_subscription_t *sub);
/**
@@ -392,6 +400,19 @@ void qdr_link_detach(qdr_link_t *link, qd_detach_type_t dt, qdr_error_t *error);
qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, pn_delivery_t *delivery, qd_message_t *msg);
qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, pn_delivery_t *delivery, qd_message_t *msg, qd_field_iterator_t *addr);
+/**
+ * qdr_send_to
+ *
+ * Send a message to a destination. This function is used only by in-process components that
+ * create messages to be sent. For these messages, there is no inbound link or delivery.
+ *
+ * @param core Pointer to the core module
+ * @param msg Pointer to the message to be sent. The message will be copied during the call
+ * can must be freed by the caller if the caller doesn't need to hold it for later use.
+ * @param exclude_inprocess If true, the message will not be sent to in-process subscribers.
+ */
+void qdr_send_to(qdr_core_t *core, qd_message_t *msg, const char *addr, bool exclude_inprocess);
+
typedef void (*qdr_link_first_attach_t) (void *context, qdr_connection_t *conn, qdr_link_t *link,
qdr_terminus_t *source, qdr_terminus_t *target);
typedef void (*qdr_link_second_attach_t) (void *context, qdr_link_t *link, qdr_terminus_t *source, qdr_terminus_t *target);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/363baaec/python/qpid_dispatch/management/qdrouter.json
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch/management/qdrouter.json b/python/qpid_dispatch/management/qdrouter.json
index 41b628a..dd54457 100644
--- a/python/qpid_dispatch/management/qdrouter.json
+++ b/python/qpid_dispatch/management/qdrouter.json
@@ -846,7 +846,7 @@
"description": "AMQP address managed by the router.",
"extends": "operationalEntity",
"attributes": {
- "inProcess": {"type": "boolean"},
+ "inProcess": {"type": "integer"},
"subscriberCount": {"type": "integer", "graph": true},
"remoteCount": {"type": "integer", "graph": true},
"deliveriesIngress": {"type": "integer", "graph": true},
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/363baaec/src/router_core/agent_address.c
----------------------------------------------------------------------
diff --git a/src/router_core/agent_address.c b/src/router_core/agent_address.c
index ef8155b..e8e82ac 100644
--- a/src/router_core/agent_address.c
+++ b/src/router_core/agent_address.c
@@ -54,7 +54,7 @@ static void qdr_insert_address_columns_CT(qdr_address_t *addr,
break;
case QDR_ADDRESS_IN_PROCESS:
- qd_compose_insert_bool(body, addr->on_message != 0);
+ qd_compose_insert_uint(body, DEQ_SIZE(addr->subscriptions));
break;
case QDR_ADDRESS_SUBSCRIBER_COUNT:
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/363baaec/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index db469f6..bb6aad9 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -387,7 +387,7 @@ static qd_address_semantics_t qdr_semantics_for_address(qdr_core_t *core, qd_fie
* Depending on its policy, the address may be eligible for being closed out
* (i.e. Logging its terminal statistics and freeing its resources).
*/
-static void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t *addr, bool was_local)
+void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t *addr, bool was_local)
{
if (addr == 0)
return;
@@ -407,7 +407,7 @@ static void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t *addr, bool was_lo
// If the address has no in-process consumer or destinations, it should be
// deleted.
//
- if (addr->on_message == 0 && DEQ_SIZE(addr->rlinks) == 0 && DEQ_SIZE(addr->inlinks) == 0 &&
+ if (DEQ_SIZE(addr->subscriptions) == 0 && DEQ_SIZE(addr->rlinks) == 0 && DEQ_SIZE(addr->inlinks) == 0 &&
DEQ_SIZE(addr->rnodes) == 0 && !addr->waypoint && !addr->block_deletion) {
qd_hash_remove_by_handle(core->addr_hash, addr->hash_handle);
DEQ_REMOVE(core->addrs, addr);
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/363baaec/src/router_core/route_tables.c
----------------------------------------------------------------------
diff --git a/src/router_core/route_tables.c b/src/router_core/route_tables.c
index 5da26fa..1007e8c 100644
--- a/src/router_core/route_tables.c
+++ b/src/router_core/route_tables.c
@@ -30,6 +30,7 @@ static void qdr_set_valid_origins_CT (qdr_core_t *core, qdr_action_t *action, bo
static void qdr_map_destination_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
static void qdr_unmap_destination_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
static void qdr_subscribe_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdr_unsubscribe_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
static qd_address_semantics_t router_addr_semantics = QD_FANOUT_SINGLE | QD_BIAS_CLOSEST | QD_CONGESTION_DROP | QD_DROP_FOR_SLOW_CONSUMERS | QD_BYPASS_VALID_ORIGINS;
@@ -128,17 +129,39 @@ void qdr_core_route_table_handlers(qdr_core_t *core,
}
-void qdr_core_subscribe(qdr_core_t *core, const char *address, char aclass, char phase,
- qd_address_semantics_t sem, qdr_receive_t on_message, void *context)
+qdr_subscription_t *qdr_core_subscribe(qdr_core_t *core,
+ const char *address,
+ char aclass,
+ char phase,
+ qd_address_semantics_t semantics,
+ qdr_receive_t on_message,
+ void *context)
{
+ qdr_subscription_t *sub = NEW(qdr_subscription_t);
+ sub->core = core;
+ sub->addr = 0;
+ sub->on_message = on_message;
+ sub->on_message_context = context;
+
qdr_action_t *action = qdr_action(qdr_subscribe_CT, "subscribe");
- action->args.subscribe.address = qdr_field(address);
- action->args.subscribe.semantics = sem;
- action->args.subscribe.address_class = aclass;
- action->args.subscribe.address_phase = phase;
- action->args.subscribe.on_message = on_message;
- action->args.subscribe.context = context;
+ action->args.io.address = qdr_field(address);
+ action->args.io.address_class = aclass;
+ action->args.io.address_phase = phase;
+ action->args.io.subscription = sub;
+ action->args.io.semantics = semantics;
qdr_action_enqueue(core, action);
+
+ return sub;
+}
+
+
+void qdr_core_unsubscribe(qdr_subscription_t *sub)
+{
+ if (sub) {
+ qdr_action_t *action = qdr_action(qdr_unsubscribe_CT, "unsubscribe");
+ action->args.io.subscription = sub;
+ qdr_action_enqueue(sub->core, action);
+ }
}
@@ -169,64 +192,6 @@ void qdr_route_table_setup_CT(qdr_core_t *core)
}
-/**
- * Check an address to see if it no longer has any associated destinations.
- * Depending on its policy, the address may be eligible for being closed out
- * (i.e. Logging its terminal statistics and freeing its resources).
- */
-static void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t *addr, bool was_local)
-{
- if (addr == 0)
- return;
-
- bool to_delete = false;
- bool no_more_locals = false;
- qdr_field_t *key_field = 0;
-
- //
- // If the address has no in-process consumer or destinations, it should be
- // deleted.
- //
- if (addr->on_message == 0 &&
- DEQ_SIZE(addr->rlinks) == 0 && DEQ_SIZE(addr->rnodes) == 0 &&
- !addr->waypoint && !addr->block_deletion)
- to_delete = true;
-
- //
- // If we have just removed a local linkage and it was the last local linkage,
- // we need to notify the router module that there is no longer a local
- // presence of this address.
- //
- if (was_local && DEQ_SIZE(addr->rlinks) == 0) {
- no_more_locals = true;
- const unsigned char *key = qd_hash_key_by_handle(addr->hash_handle);
- if (key && (key[0] == 'M' || key[0] == 'C' || key[0] == 'D'))
- key_field = qdr_field((const char*) key);
- }
-
- if (to_delete) {
- //
- // Delete the address but grab the hash key so we can use it outside the
- // critical section.
- //
- qd_hash_remove_by_handle(core->addr_hash, addr->hash_handle);
- DEQ_REMOVE(core->addrs, addr);
- qd_hash_handle_free(addr->hash_handle);
- free_qdr_address_t(addr);
- }
-
- //
- // If the address is mobile-class and it was just removed from a local link,
- // tell the router module that it is no longer attached locally.
- //
- if (no_more_locals && key_field) {
- //
- // TODO - Defer-call mobile-removed
- //
- }
-}
-
-
static void qdr_add_router_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
int router_maskbit = action->args.route_table.router_maskbit;
@@ -585,11 +550,12 @@ static void qdr_unmap_destination_CT(qdr_core_t *core, qdr_action_t *action, boo
static void qdr_subscribe_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
{
- qdr_field_t *address = action->args.subscribe.address;
+ qdr_field_t *address = action->args.io.address;
+ qdr_subscription_t *sub = action->args.io.subscription;
if (!discard) {
- char aclass = action->args.subscribe.address_class;
- char phase = action->args.subscribe.address_phase;
+ char aclass = action->args.io.address_class;
+ char phase = action->args.io.address_phase;
qdr_address_t *addr = 0;
qd_address_iterator_override_prefix(address->iterator, aclass);
@@ -599,24 +565,35 @@ static void qdr_subscribe_CT(qdr_core_t *core, qdr_action_t *action, bool discar
qd_hash_retrieve(core->addr_hash, address->iterator, (void**) &addr);
if (!addr) {
- addr = qdr_address(action->args.subscribe.semantics);
+ addr = qdr_address(action->args.io.semantics);
qd_hash_insert(core->addr_hash, address->iterator, addr, &addr->hash_handle);
DEQ_ITEM_INIT(addr);
DEQ_INSERT_TAIL(core->addrs, addr);
}
- if (!addr->on_message) {
- addr->on_message = action->args.subscribe.on_message;
- addr->on_message_context = action->args.subscribe.context;
- } else
- qd_log(core->log, QD_LOG_CRITICAL,
- "qdr_core_subscribe: Multiple in-process subscriptions on the same address");
- }
+ sub->addr = addr;
+ DEQ_ITEM_INIT(sub);
+ DEQ_INSERT_TAIL(addr->subscriptions, sub);
+ } else
+ free(sub);
qdr_field_free(address);
}
+static void qdr_unsubscribe_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+{
+ qdr_subscription_t *sub = action->args.io.subscription;
+
+ if (!discard) {
+ DEQ_REMOVE(sub->addr->subscriptions, sub);
+ sub->addr = 0;
+ qdr_check_addr_CT(sub->core, sub->addr, false);
+ }
+
+ free(sub);
+}
+
//==================================================================================
// Call-back Functions
//==================================================================================
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/363baaec/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 0ee1604..1cc5cf2 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -76,16 +76,16 @@ struct qdr_action_t {
} connection;
//
- // Arguments for in-process subscriptions
+ // Arguments for in-process messaging
//
struct {
qdr_field_t *address;
- qd_address_semantics_t semantics;
char address_class;
char address_phase;
- qdr_receive_t on_message;
- void *context;
- } subscribe;
+ qd_address_semantics_t semantics;
+ qdr_subscription_t *subscription;
+ qd_message_t *message;
+ } io;
//
// Arguments for management-agent actions
@@ -204,21 +204,30 @@ struct qdr_lrp_ref_t {
ALLOC_DECLARE(qdr_lrp_ref_t);
DEQ_DECLARE(qdr_lrp_ref_t, qdr_lrp_ref_list_t);
+struct qdr_subscription_t {
+ DEQ_LINKS(qdr_subscription_t);
+ qdr_core_t *core;
+ qdr_address_t *addr;
+ qdr_receive_t on_message;
+ void *on_message_context;
+};
+
+DEQ_DECLARE(qdr_subscription_t, qdr_subscription_list_t);
+
struct qdr_address_t {
DEQ_LINKS(qdr_address_t);
- qd_router_message_cb_t on_message; ///< In-Process Message Consumer
- void *on_message_context; ///< In-Process Consumer context
- qdr_lrp_ref_list_t lrps; ///< Local link-route destinations
- qdr_link_ref_list_t rlinks; ///< Locally-Connected Consumers
- qdr_link_ref_list_t inlinks; ///< Locally-Connected Producers
- qdr_router_ref_list_t rnodes; ///< Remotely-Connected Consumers
- qd_hash_handle_t *hash_handle; ///< Linkage back to the hash table entry
- qd_address_semantics_t semantics;
- bool toggle;
- bool waypoint;
- bool block_deletion;
- qd_router_forwarder_t *forwarder;
+ qdr_subscription_list_t subscriptions; ///< In-process message subscribers
+ qdr_lrp_ref_list_t lrps; ///< Local link-route destinations
+ qdr_link_ref_list_t rlinks; ///< Locally-Connected Consumers
+ qdr_link_ref_list_t inlinks; ///< Locally-Connected Producers
+ qdr_router_ref_list_t rnodes; ///< Remotely-Connected Consumers
+ qd_hash_handle_t *hash_handle; ///< Linkage back to the hash table entry
+ qd_address_semantics_t semantics;
+ bool toggle;
+ bool waypoint;
+ bool block_deletion;
+ qd_router_forwarder_t *forwarder;
/**@name Statistics */
///@{
@@ -384,6 +393,7 @@ void qdr_post_mobile_removed_CT(qdr_core_t *core, const char *address_hash);
void qdr_post_link_lost_CT(qdr_core_t *core, int link_maskbit);
void qdr_post_general_work_CT(qdr_core_t *core, qdr_general_work_t *work);
+void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t *addr, bool was_local);
qdr_query_t *qdr_query(qdr_core_t *core,
void *context,
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/363baaec/tools/qdstat
----------------------------------------------------------------------
diff --git a/tools/qdstat b/tools/qdstat
index bdca23a..ce1e850 100755
--- a/tools/qdstat
+++ b/tools/qdstat
@@ -269,7 +269,7 @@ class BusManager(Node):
heads.append(Header("class"))
heads.append(Header("address"))
heads.append(Header("phase"))
- heads.append(Header("in-proc", Header.Y))
+ heads.append(Header("in-proc", Header.COMMAS))
heads.append(Header("local", Header.COMMAS))
heads.append(Header("remote", Header.COMMAS))
heads.append(Header("in", Header.COMMAS))
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/2] qpid-dispatch git commit: DISPATCH-179 - Introduced 'T'-class
addresses to replace the old "is-local" condition. T-class addresses are
topological addresses that match "all" routers in an area or in all areas.
Posted by tr...@apache.org.
DISPATCH-179 - Introduced 'T'-class addresses to replace the old "is-local" condition.
T-class addresses are topological addresses that match "all" routers in
an area or in all areas.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/8be2a0a9
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/8be2a0a9
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/8be2a0a9
Branch: refs/heads/tross-DISPATCH-179-1
Commit: 8be2a0a9294b1fead41c258262096c43956fb5b2
Parents: 363baae
Author: Ted Ross <tr...@redhat.com>
Authored: Tue Dec 22 10:14:02 2015 -0500
Committer: Ted Ross <tr...@redhat.com>
Committed: Tue Dec 22 10:14:02 2015 -0500
----------------------------------------------------------------------
src/iterator.c | 7 +++-
src/router_core/transfer.c | 84 +++++++++++++++++++++++++++++++++++++++++
tests/field_test.c | 8 ++--
3 files changed, 94 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/8be2a0a9/src/iterator.c
----------------------------------------------------------------------
diff --git a/src/iterator.c b/src/iterator.c
index 7a8f0ab..686447d 100644
--- a/src/iterator.c
+++ b/src/iterator.c
@@ -110,7 +110,12 @@ static void parse_address_view(qd_field_iterator_t *iter)
if (qd_field_iterator_prefix(iter, "topo/")) {
if (qd_field_iterator_prefix(iter, "all/") || qd_field_iterator_prefix(iter, my_area)) {
- if (qd_field_iterator_prefix(iter, "all/") || qd_field_iterator_prefix(iter, my_router)) {
+ if (qd_field_iterator_prefix(iter, "all/")) {
+ iter->prefix = 'T';
+ iter->state = STATE_AT_PREFIX;
+ iter->view_prefix = true;
+ return;
+ } else if (qd_field_iterator_prefix(iter, my_router)) {
iter->prefix = 'L';
iter->state = STATE_AT_PREFIX;
iter->view_prefix = true;
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/8be2a0a9/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
new file mode 100644
index 0000000..d8a05ee
--- /dev/null
+++ b/src/router_core/transfer.c
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "router_core_private.h"
+#include <qpid/dispatch/amqp.h>
+#include <stdio.h>
+
+static void qdr_link_deliver_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdr_link_deliver_to_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdr_send_to_CT(qdr_core_t *core, qdr_action_t *action, bool discard);
+
+
+//==================================================================================
+// Internal Functions
+//==================================================================================
+
+//==================================================================================
+// Interface Functions
+//==================================================================================
+
+qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, pn_delivery_t *delivery, qd_message_t *msg)
+{
+ qdr_action_t *action = qdr_action(qdr_link_delivery_CT, "link_delivery");
+
+ qdr_action_enqueue(core, action);
+}
+
+
+qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, pn_delivery_t *delivery, qd_message_t *msg, qd_field_iterator_t *addr)
+{
+ qdr_action_t *action = qdr_action(qdr_link_delivery_to_CT, "link_delivery_to");
+
+ qdr_action_enqueue(core, action);
+}
+
+
+void qdr_send_to(qdr_core_t *core, qd_message_t *msg, const char *addr, bool exclude_inprocess)
+{
+ qdr_action_t *action = qdr_action(qdr_send_to_CT, "send_to");
+
+ qdr_action_enqueue(core, action);
+}
+
+
+//==================================================================================
+// In-Thread Functions
+//==================================================================================
+
+static void qdr_link_delivery_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+{
+ if (discard)
+ return;
+}
+
+
+static void qdr_link_delivery_to_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+{
+ if (discard)
+ return;
+}
+
+
+static void qdr_send_to_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+{
+ if (discard)
+ return;
+}
+
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/8be2a0a9/tests/field_test.c
----------------------------------------------------------------------
diff --git a/tests/field_test.c b/tests/field_test.c
index a093505..e1ec6c6 100644
--- a/tests/field_test.c
+++ b/tests/field_test.c
@@ -159,8 +159,8 @@ static char* test_view_address_hash(void *context)
{"amqp:/_topo/my-area/router/local/sub", "Rrouter"},
{"amqp:/_topo/my-area/my-router/local/sub", "Llocal/sub"},
{"amqp:/_topo/area/all/local/sub", "Aarea"},
- {"amqp:/_topo/my-area/all/local/sub", "Llocal/sub"},
- {"amqp:/_topo/all/all/local/sub", "Llocal/sub"},
+ {"amqp:/_topo/my-area/all/local/sub", "Tlocal/sub"},
+ {"amqp:/_topo/all/all/local/sub", "Tlocal/sub"},
{"amqp://host:port/_local/my-addr", "Lmy-addr"},
{"_topo/area/router/my-addr", "Aarea"},
{"_topo/my-area/router/my-addr", "Rrouter"},
@@ -175,8 +175,8 @@ static char* test_view_address_hash(void *context)
{"amqp:/_topo/my-area/router/local/sub.", "Rrouter"},
{"amqp:/_topo/my-area/my-router/local/sub.", "Llocal/sub"},
{"amqp:/_topo/area/all/local/sub.", "Aarea"},
- {"amqp:/_topo/my-area/all/local/sub.", "Llocal/sub"},
- {"amqp:/_topo/all/all/local/sub.", "Llocal/sub"},
+ {"amqp:/_topo/my-area/all/local/sub.", "Tlocal/sub"},
+ {"amqp:/_topo/all/all/local/sub.", "Tlocal/sub"},
{"amqp://host:port/_local/my-addr.", "Lmy-addr"},
{"_topo/area/router/my-addr.", "Aarea"},
{"_topo/my-area/router/my-addr.", "Rrouter"},
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org