You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2015/10/28 22:57:08 UTC
qpid-dispatch git commit: DISPATCH-179 - Implemented the
core_subscribe action.
Repository: qpid-dispatch
Updated Branches:
refs/heads/tross-DISPATCH-179-1 61be97fdf -> 22c856834
DISPATCH-179 - Implemented the core_subscribe action.
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/22c85683
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/22c85683
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/22c85683
Branch: refs/heads/tross-DISPATCH-179-1
Commit: 22c856834295f24c2c66e1614619b337267afb70
Parents: 61be97f
Author: Ted Ross <tr...@redhat.com>
Authored: Wed Oct 28 17:56:23 2015 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Wed Oct 28 17:56:23 2015 -0400
----------------------------------------------------------------------
include/qpid/dispatch/router_core.h | 3 +-
src/router_core/route_tables.c | 48 ++++++++++++++++++++++++++++++
src/router_core/router_core_private.h | 21 ++++++++++++-
3 files changed, 70 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/22c85683/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 2e11139..fd19c82 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -76,7 +76,8 @@ void qdr_core_route_table_handlers(qdr_core_t *core,
*/
typedef void (*qdr_receive_t) (void *context, qd_message_t *msg, int link_maskbit);
-void qdr_core_subscribe(qdr_core_t *core, const char *address, bool local, bool mobile, qdr_receive_t on_message, void *context);
+void qdr_core_subscribe(qdr_core_t *core, const char *address, char aclass, char phase,
+ qd_address_semantics_t sem, qdr_receive_t on_message, void *context);
/**
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/22c85683/src/router_core/route_tables.c
----------------------------------------------------------------------
diff --git a/src/router_core/route_tables.c b/src/router_core/route_tables.c
index b71564e..1c96657 100644
--- a/src/router_core/route_tables.c
+++ b/src/router_core/route_tables.c
@@ -29,6 +29,7 @@ static void qdrh_remove_next_hop_CT (qdr_core_t *core, qdr_action_t *action, b
static void qdrh_set_valid_origins_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
static void qdrh_map_destination_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
static void qdrh_unmap_destination_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdrh_subscribe_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
static qd_address_semantics_t router_addr_semantics = QD_FANOUT_SINGLE | QD_BIAS_CLOSEST | QD_CONGESTION_DROP | QD_DROP_FOR_SLOW_CONSUMERS | QD_BYPASS_VALID_ORIGINS;
@@ -132,6 +133,20 @@ void qdr_core_route_table_handlers(qdr_core_t *core,
}
+void qdr_core_subscribe(qdr_core_t *core, const char *address, char aclass, char phase,
+ qd_address_semantics_t sem, qdr_receive_t on_message, void *context)
+{
+ qdr_action_t *action = qdr_action(qdrh_subscribe_CT);
+ action->args.subscribe.address = qdr_field(address);
+ action->args.subscribe.semantics = sem;
+ action->args.subscribe.aclass = aclass;
+ action->args.subscribe.phase = phase;
+ action->args.subscribe.on_message = on_message;
+ action->args.subscribe.context = context;
+ qdr_action_enqueue(core, action);
+}
+
+
//==================================================================================
// In-Thread Functions
//==================================================================================
@@ -567,3 +582,36 @@ static void qdrh_unmap_destination_CT(qdr_core_t *core, qdr_action_t *action, bo
}
+static void qdrh_subscribe_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+{
+ qdr_field_t *address = action->args.subscribe.address;
+
+ if (!discard) {
+ char aclass = action->args.subscribe.aclass;
+ char phase = action->args.subscribe.phase;
+ qdr_address_t *addr = 0;
+
+ qd_address_iterator_override_prefix(address->iterator, aclass);
+ if (aclass == 'M')
+ qd_address_iterator_set_phase(address->iterator, phase);
+ qd_address_iterator_reset_view(address->iterator, ITER_VIEW_ADDRESS_HASH);
+
+ qd_hash_retrieve(core->addr_hash, address->iterator, (void**) &addr);
+ if (!addr) {
+ addr = qdr_address(action->args.subscribe.semantics);
+ qd_hash_insert(core->addr_hash, address->iterator, addr, &addr->hash_handle);
+ DEQ_ITEM_INIT(addr);
+ DEQ_INSERT_TAIL(core->addrs, addr);
+ }
+
+ if (!addr->on_message) {
+ addr->on_message = action->args.subscribe.on_message;
+ addr->on_message_context = action->args.subscribe.context;
+ } else
+ qd_log(core->log, QD_LOG_CRITICAL,
+ "qdr_core_subscribe: Multiple in-process subscriptions on the same address");
+ }
+
+ qdr_field_free(address);
+}
+
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/22c85683/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 5dd1d84..5cb14a8 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -48,6 +48,9 @@ struct qdr_action_t {
DEQ_LINKS(qdr_action_t);
qdr_action_handler_t action_handler;
union {
+ //
+ // Arguments for router control-plane actions
+ //
struct {
int link_maskbit;
int router_maskbit;
@@ -58,9 +61,25 @@ struct qdr_action_t {
char address_phase;
qd_address_semantics_t semantics;
} route_table;
+
+ //
+ // Arguments for in-process subscriptions
+ //
+ struct {
+ qdr_field_t *address;
+ qd_address_semantics_t semantics;
+ char aclass;
+ char phase;
+ qdr_receive_t on_message;
+ void *context;
+ } subscribe;
+
+ //
+ // Arguments for management-agent actions
+ //
struct {
qdr_query_t *query;
- int offset;
+ int offset;
} agent;
} args;
};
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org