You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2015/10/28 22:57:08 UTC

qpid-dispatch git commit: DISPATCH-179 - Implemented the core_subscribe action.

Repository: qpid-dispatch
Updated Branches:
  refs/heads/tross-DISPATCH-179-1 61be97fdf -> 22c856834


DISPATCH-179 - Implemented the core_subscribe action.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/22c85683
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/22c85683
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/22c85683

Branch: refs/heads/tross-DISPATCH-179-1
Commit: 22c856834295f24c2c66e1614619b337267afb70
Parents: 61be97f
Author: Ted Ross <tr...@redhat.com>
Authored: Wed Oct 28 17:56:23 2015 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Wed Oct 28 17:56:23 2015 -0400

----------------------------------------------------------------------
 include/qpid/dispatch/router_core.h   |  3 +-
 src/router_core/route_tables.c        | 48 ++++++++++++++++++++++++++++++
 src/router_core/router_core_private.h | 21 ++++++++++++-
 3 files changed, 70 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/22c85683/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 2e11139..fd19c82 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -76,7 +76,8 @@ void qdr_core_route_table_handlers(qdr_core_t           *core,
  */
 typedef void (*qdr_receive_t) (void *context, qd_message_t *msg, int link_maskbit);
 
-void qdr_core_subscribe(qdr_core_t *core, const char *address, bool local, bool mobile, qdr_receive_t on_message, void *context);
+void qdr_core_subscribe(qdr_core_t *core, const char *address, char aclass, char phase,
+                        qd_address_semantics_t sem, qdr_receive_t on_message, void *context);
 
 
 /**

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/22c85683/src/router_core/route_tables.c
----------------------------------------------------------------------
diff --git a/src/router_core/route_tables.c b/src/router_core/route_tables.c
index b71564e..1c96657 100644
--- a/src/router_core/route_tables.c
+++ b/src/router_core/route_tables.c
@@ -29,6 +29,7 @@ static void qdrh_remove_next_hop_CT   (qdr_core_t *core, qdr_action_t *action, b
 static void qdrh_set_valid_origins_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
 static void qdrh_map_destination_CT   (qdr_core_t *core, qdr_action_t *action, bool discard);
 static void qdrh_unmap_destination_CT (qdr_core_t *core, qdr_action_t *action, bool discard);
+static void qdrh_subscribe_CT         (qdr_core_t *core, qdr_action_t *action, bool discard);
 
 static qd_address_semantics_t router_addr_semantics = QD_FANOUT_SINGLE | QD_BIAS_CLOSEST | QD_CONGESTION_DROP | QD_DROP_FOR_SLOW_CONSUMERS | QD_BYPASS_VALID_ORIGINS;
 
@@ -132,6 +133,20 @@ void qdr_core_route_table_handlers(qdr_core_t           *core,
 }
 
 
+void qdr_core_subscribe(qdr_core_t *core, const char *address, char aclass, char phase,
+                        qd_address_semantics_t sem, qdr_receive_t on_message, void *context)
+{
+    qdr_action_t *action = qdr_action(qdrh_subscribe_CT);
+    action->args.subscribe.address    = qdr_field(address);
+    action->args.subscribe.semantics  = sem;
+    action->args.subscribe.aclass     = aclass;
+    action->args.subscribe.phase      = phase;
+    action->args.subscribe.on_message = on_message;
+    action->args.subscribe.context    = context;
+    qdr_action_enqueue(core, action);
+}
+
+
 //==================================================================================
 // In-Thread Functions
 //==================================================================================
@@ -567,3 +582,36 @@ static void qdrh_unmap_destination_CT(qdr_core_t *core, qdr_action_t *action, bo
 }
 
 
+static void qdrh_subscribe_CT(qdr_core_t *core, qdr_action_t *action, bool discard)
+{
+    qdr_field_t *address = action->args.subscribe.address;
+
+    if (!discard) {
+        char aclass         = action->args.subscribe.aclass;
+        char phase          = action->args.subscribe.phase;
+        qdr_address_t *addr = 0;
+
+        qd_address_iterator_override_prefix(address->iterator, aclass);
+        if (aclass == 'M')
+            qd_address_iterator_set_phase(address->iterator, phase);
+        qd_address_iterator_reset_view(address->iterator, ITER_VIEW_ADDRESS_HASH);
+
+        qd_hash_retrieve(core->addr_hash, address->iterator, (void**) &addr);
+        if (!addr) {
+            addr = qdr_address(action->args.subscribe.semantics);
+            qd_hash_insert(core->addr_hash, address->iterator, addr, &addr->hash_handle);
+            DEQ_ITEM_INIT(addr);
+            DEQ_INSERT_TAIL(core->addrs, addr);
+        }
+
+        if (!addr->on_message) {
+            addr->on_message         = action->args.subscribe.on_message;
+            addr->on_message_context = action->args.subscribe.context;
+        } else
+            qd_log(core->log, QD_LOG_CRITICAL,
+                   "qdr_core_subscribe: Multiple in-process subscriptions on the same address");
+    }
+
+    qdr_field_free(address);
+}
+

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/22c85683/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h
index 5dd1d84..5cb14a8 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -48,6 +48,9 @@ struct qdr_action_t {
     DEQ_LINKS(qdr_action_t);
     qdr_action_handler_t action_handler;
     union {
+        //
+        // Arguments for router control-plane actions
+        //
         struct {
             int                     link_maskbit;
             int                     router_maskbit;
@@ -58,9 +61,25 @@ struct qdr_action_t {
             char                    address_phase;
             qd_address_semantics_t  semantics;
         } route_table;
+
+        //
+        // Arguments for in-process subscriptions
+        //
+        struct {
+            qdr_field_t            *address;
+            qd_address_semantics_t  semantics;
+            char                    aclass;
+            char                    phase;
+            qdr_receive_t           on_message;
+            void                   *context;
+        } subscribe;
+
+        //
+        // Arguments for management-agent actions
+        //
         struct {
             qdr_query_t *query;
-           int          offset;
+            int          offset;
         } agent;
     } args;
 };


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org