You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2016/03/31 19:00:33 UTC

qpid-dispatch git commit: DISPATCH-127 - Carry the ingress address phase across inter-router links so waypoints can be accessed from remote routers.

Repository: qpid-dispatch
Updated Branches:
  refs/heads/master 2a03648c8 -> 9d08d0a67


DISPATCH-127 - Carry the ingress address phase across inter-router links so waypoints can be
accessed from remote routers.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/9d08d0a6
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/9d08d0a6
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/9d08d0a6

Branch: refs/heads/master
Commit: 9d08d0a678d71c275ab0c05d4f28b1549630e132
Parents: 2a03648
Author: Ted Ross <tr...@redhat.com>
Authored: Thu Mar 31 12:59:14 2016 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Thu Mar 31 12:59:14 2016 -0400

----------------------------------------------------------------------
 include/qpid/dispatch/amqp.h        |  1 +
 include/qpid/dispatch/message.h     | 10 ++++++++++
 include/qpid/dispatch/router_core.h | 11 +++++++++++
 src/amqp.c                          |  1 +
 src/message.c                       | 19 ++++++++++++++++++
 src/message_private.h               |  2 +-
 src/router_core/connections.c       |  6 ++++++
 src/router_node.c                   | 33 +++++++++++++++++++++++++++-----
 tests/system_tests_link_routes.py   |  5 -----
 9 files changed, 77 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/9d08d0a6/include/qpid/dispatch/amqp.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/amqp.h b/include/qpid/dispatch/amqp.h
index d4bc340..54773b5 100644
--- a/include/qpid/dispatch/amqp.h
+++ b/include/qpid/dispatch/amqp.h
@@ -94,6 +94,7 @@ enum {
 extern const char * const QD_MA_INGRESS;  ///< Ingress Router
 extern const char * const QD_MA_TRACE;    ///< Trace
 extern const char * const QD_MA_TO;       ///< To-Override
+extern const char * const QD_MA_PHASE;    ///< Phase for override address
 extern const char * const QD_MA_CLASS;    ///< Message-Class
 /// @}
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/9d08d0a6/include/qpid/dispatch/message.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h
index 7ab7b9c..66601ba 100644
--- a/include/qpid/dispatch/message.h
+++ b/include/qpid/dispatch/message.h
@@ -176,6 +176,16 @@ void qd_message_set_trace_annotation(qd_message_t *msg, qd_composed_field_t *tra
 void qd_message_set_to_override_annotation(qd_message_t *msg, qd_composed_field_t *to_field);
 
 /**
+ * Set a phase for the phase annotation in the message.
+ *
+ * @param msg Pointer to an outgoing message.
+ * @param phase The phase of the address for the outgoing message.
+ *
+ */
+void qd_message_set_phase_annotation(qd_message_t *msg, int phase);
+int  qd_message_get_phase_annotation(const qd_message_t *msg);
+
+/**
  * Set the value for the QD_MA_INGRESS field in the outgoing message
  * annotations for the message.
  *

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/9d08d0a6/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h
index 6e26b41..3956b5c 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -382,6 +382,17 @@ qd_link_type_t qdr_link_type(const qdr_link_t *link);
 qd_direction_t qdr_link_direction(const qdr_link_t *link);
 
 /**
+ * qdr_link_phase
+ *
+ * If this link is associated with an auto_link, return the address phase.  Otherwise
+ * return zero.
+ *
+ * @param link Link object
+ * @return 0 or the phase of the link's auto_link.
+ */
+int qdr_link_phase(const qdr_link_t *link);
+
+/**
  * qdr_link_is_anonymous
  *
  * Indicate whether the link is anonymous.  Note that this is determined inside

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/9d08d0a6/src/amqp.c
----------------------------------------------------------------------
diff --git a/src/amqp.c b/src/amqp.c
index c02216d..656f7ef 100644
--- a/src/amqp.c
+++ b/src/amqp.c
@@ -22,6 +22,7 @@
 const char * const QD_MA_INGRESS = "x-opt-qd.ingress";
 const char * const QD_MA_TRACE   = "x-opt-qd.trace";
 const char * const QD_MA_TO      = "x-opt-qd.to";
+const char * const QD_MA_PHASE   = "x-opt-qd.phase";
 const char * const QD_MA_CLASS   = "x-opt-qd.class";
 
 const char * const QD_CAPABILITY_ROUTER_CONTROL  = "qd.router";

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/9d08d0a6/src/message.c
----------------------------------------------------------------------
diff --git a/src/message.c b/src/message.c
index c8ade93..c5dc028 100644
--- a/src/message.c
+++ b/src/message.c
@@ -547,6 +547,7 @@ qd_message_t *qd_message()
     DEQ_INIT(msg->ma_to_override);
     DEQ_INIT(msg->ma_trace);
     DEQ_INIT(msg->ma_ingress);
+    msg->ma_phase = 0;
     msg->content = new_qd_message_content_t();
 
     if (msg->content == 0) {
@@ -612,6 +613,7 @@ qd_message_t *qd_message_copy(qd_message_t *in_msg)
     qd_buffer_list_clone(&copy->ma_to_override, &msg->ma_to_override);
     qd_buffer_list_clone(&copy->ma_trace, &msg->ma_trace);
     qd_buffer_list_clone(&copy->ma_ingress, &msg->ma_ingress);
+    copy->ma_phase = msg->ma_phase;
 
     copy->content = content;
 
@@ -666,6 +668,18 @@ void qd_message_set_to_override_annotation(qd_message_t *in_msg, qd_composed_fie
     qd_compose_free(to_field);
 }
 
+void qd_message_set_phase_annotation(qd_message_t *in_msg, int phase)
+{
+    qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
+    msg->ma_phase = phase;
+}
+
+int qd_message_get_phase_annotation(const qd_message_t *in_msg)
+{
+    qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
+    return msg->ma_phase;
+}
+
 void qd_message_set_ingress_annotation(qd_message_t *in_msg, qd_composed_field_t *ingress_field)
 {
     qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
@@ -790,6 +804,11 @@ static bool compose_message_annotations(qd_message_pvt_t *msg, qd_buffer_list_t
             qd_compose_insert_buffers(out_ma, &msg->ma_ingress);
         }
 
+        if (msg->ma_phase != 0) {
+            qd_compose_insert_symbol(out_ma, QD_MA_PHASE);
+            qd_compose_insert_int(out_ma, msg->ma_phase);
+        }
+
         qd_compose_end_map(out_ma);
 
         qd_compose_take_buffers(out_ma, out);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/9d08d0a6/src/message_private.h
----------------------------------------------------------------------
diff --git a/src/message_private.h b/src/message_private.h
index f09fa25..8ede2c7 100644
--- a/src/message_private.h
+++ b/src/message_private.h
@@ -93,7 +93,7 @@ typedef struct {
     qd_buffer_list_t      ma_to_override;  // to field in outgoing message annotations.
     qd_buffer_list_t      ma_trace;        // trace list in outgoing message annotations
     qd_buffer_list_t      ma_ingress;      // ingress field in outgoing message annotations
-
+    int                   ma_phase;        // phase for the override address
 } qd_message_pvt_t;
 
 ALLOC_DECLARE(qd_message_t);

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/9d08d0a6/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index 4cfc8cd..9b6e63b 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -217,6 +217,12 @@ qd_direction_t qdr_link_direction(const qdr_link_t *link)
 }
 
 
+int qdr_link_phase(const qdr_link_t *link)
+{
+    return link && link->auto_link ? link->auto_link->phase : 0;
+}
+
+
 bool qdr_link_is_anonymous(const qdr_link_t *link)
 {
     return link->owning_addr == 0;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/9d08d0a6/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index 4bc5ad9..41a8d9a 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -94,6 +94,7 @@ static qd_field_iterator_t *router_annotate_message(qd_router_t       *router,
     qd_parsed_field_t *trace   = 0;
     qd_parsed_field_t *ingress = 0;
     qd_parsed_field_t *to      = 0;
+    qd_parsed_field_t *phase   = 0;
 
     *link_exclusions = 0;
 
@@ -115,8 +116,10 @@ static qd_field_iterator_t *router_annotate_message(qd_router_t       *router,
                 ingress = qd_parse_sub_value(in_ma, idx);
             } else if (qd_field_iterator_equal(iter, (unsigned char*) QD_MA_TO)) {
                 to = qd_parse_sub_value(in_ma, idx);
+            } else if (qd_field_iterator_equal(iter, (unsigned char*) QD_MA_PHASE)) {
+                phase = qd_parse_sub_value(in_ma, idx);
             }
-            done = trace && ingress && to;
+            done = trace && ingress && to && phase;
         }
     }
 
@@ -166,6 +169,15 @@ static qd_field_iterator_t *router_annotate_message(qd_router_t       *router,
     }
 
     //
+    // QD_MA_PHASE:
+    // Preserve the existing value.
+    //
+    if (phase) {
+        int phase_val = qd_parse_as_int(phase);
+        qd_message_set_phase_annotation(msg, phase_val);
+    }
+
+    //
     // QD_MA_INGRESS:
     // If there is no ingress field, annotate the ingress as
     // this router else keep the original field.
@@ -264,14 +276,17 @@ static void AMQP_rx_handler(void* context, qd_link_t *link, pn_delivery_t *pnd)
 
         if (anonymous_link) {
             qd_field_iterator_t *addr_iter = 0;
+            int phase = 0;
             
             //
             // If the message has delivery annotations, get the to-override field from the annotations.
             //
             if (in_ma) {
                 qd_parsed_field_t *ma_to = qd_parse_value_by_key(in_ma, QD_MA_TO);
-                if (ma_to)
+                if (ma_to) {
                     addr_iter = qd_field_iterator_dup(qd_parse_raw(ma_to));
+                    phase = qd_message_get_phase_annotation(msg);
+                }
             }
 
             //
@@ -282,15 +297,23 @@ static void AMQP_rx_handler(void* context, qd_link_t *link, pn_delivery_t *pnd)
 
             if (addr_iter) {
                 qd_address_iterator_reset_view(addr_iter, ITER_VIEW_ADDRESS_HASH);
+                if (phase > 0)
+                    qd_address_iterator_set_phase(addr_iter, '0' + (char) phase);
                 delivery = qdr_link_deliver_to(rlink, msg, ingress_iter, addr_iter, pn_delivery_settled(pnd),
                                                link_exclusions);
             }
         } else {
-            const char *r_tgt = pn_terminus_get_address(qd_link_remote_target(link));
-            if (r_tgt) {
+            const char *term_addr = pn_terminus_get_address(qd_link_remote_target(link));
+            if (!term_addr)
+                term_addr = pn_terminus_get_address(qd_link_source(link));
+
+            if (term_addr) {
                 qd_composed_field_t *to_override = qd_compose_subfield(0);
-                qd_compose_insert_string(to_override, r_tgt);
+                qd_compose_insert_string(to_override, term_addr);
                 qd_message_set_to_override_annotation(msg, to_override);
+                int phase = qdr_link_phase(rlink);
+                if (phase != 0)
+                    qd_message_set_phase_annotation(msg, phase);
             }
             delivery = qdr_link_deliver(rlink, msg, ingress_iter, pn_delivery_settled(pnd), link_exclusions);
         }

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/9d08d0a6/tests/system_tests_link_routes.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_link_routes.py b/tests/system_tests_link_routes.py
index 89a3fa7..6f3ca61 100644
--- a/tests/system_tests_link_routes.py
+++ b/tests/system_tests_link_routes.py
@@ -126,11 +126,8 @@ class LinkRoutePatternTest(TestCase):
         """
         out = self.run_qdstat_linkRoute(self.routers[1].addresses[0])
         out_list = out.split()
-        self.assertEqual(out_list.count('org.apache.qpid.dispatch.config.linkRoute'), 2)
-        self.assertEqual(out_list.count('org.apache.qpid.dispatch.config.linkRoute'), 2)
         self.assertEqual(out_list.count('in'), 1)
         self.assertEqual(out_list.count('out'), 1)
-        self.assertEqual(out_list.count('broker'), 2)
 
     def test_ccc_qdstat_link_routes_routerC(self):
         """
@@ -140,8 +137,6 @@ class LinkRoutePatternTest(TestCase):
         out = self.run_qdstat_linkRoute(self.routers[2].addresses[1])
         out_list = out.split()
 
-        self.assertEqual(out_list.count('org.apache.qpid.dispatch.config.linkRoute'), 2)
-        self.assertEqual(out_list.count('org.apache.qpid.dispatch.config.linkRoute'), 2)
         self.assertEqual(out_list.count('in'), 1)
         self.assertEqual(out_list.count('out'), 1)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org