You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2014/04/04 22:06:25 UTC

svn commit: r1584880 - in /qpid/dispatch/trunk: etc/ include/qpid/dispatch/ python/qpid_dispatch_internal/config/ python/qpid_dispatch_internal/router/ router/src/ src/ tests/ tools/

Author: tross
Date: Fri Apr  4 20:06:24 2014
New Revision: 1584880

URL: http://svn.apache.org/r1584880
Log:
DISPATCH-35 and DISPATCH-34 - Added basis for multi-phase transfer and on-demand connectors.

Modified:
    qpid/dispatch/trunk/etc/qdrouterd.conf
    qpid/dispatch/trunk/include/qpid/dispatch/iterator.h
    qpid/dispatch/trunk/python/qpid_dispatch_internal/config/parser.py
    qpid/dispatch/trunk/python/qpid_dispatch_internal/config/schema.py
    qpid/dispatch/trunk/python/qpid_dispatch_internal/router/mobile.py
    qpid/dispatch/trunk/router/src/main.c
    qpid/dispatch/trunk/src/iterator.c
    qpid/dispatch/trunk/src/router_config.c
    qpid/dispatch/trunk/src/router_node.c
    qpid/dispatch/trunk/src/router_private.h
    qpid/dispatch/trunk/src/router_pynode.c
    qpid/dispatch/trunk/tests/field_test.c
    qpid/dispatch/trunk/tests/system_tests_two_routers.py
    qpid/dispatch/trunk/tools/qdstat.in

Modified: qpid/dispatch/trunk/etc/qdrouterd.conf
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/etc/qdrouterd.conf?rev=1584880&r1=1584879&r2=1584880&view=diff
==============================================================================
--- qpid/dispatch/trunk/etc/qdrouterd.conf (original)
+++ qpid/dispatch/trunk/etc/qdrouterd.conf Fri Apr  4 20:06:24 2014
@@ -53,6 +53,20 @@ fixed-address {
 }
 
 fixed-address {
+    prefix: /queue/
+    phase:  0
+    fanout: single
+    bias:   closest
+}
+
+fixed-address {
+    prefix: /queue/
+    phase:  1
+    fanout: single
+    bias:   closest
+}
+
+fixed-address {
     prefix: /
     fanout: multiple
 }

Modified: qpid/dispatch/trunk/include/qpid/dispatch/iterator.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/iterator.h?rev=1584880&r1=1584879&r2=1584880&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/iterator.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/iterator.h Fri Apr  4 20:06:24 2014
@@ -136,6 +136,8 @@ void qd_field_iterator_reset(qd_field_it
 void qd_field_iterator_reset_view(qd_field_iterator_t *iter,
                                   qd_iterator_view_t   view);
 
+void qd_field_iterator_set_phase(qd_field_iterator_t *iter, char phase);
+
 /**
  * Return the current octet in the iterator's view and step to the next.
  */

Modified: qpid/dispatch/trunk/python/qpid_dispatch_internal/config/parser.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch_internal/config/parser.py?rev=1584880&r1=1584879&r2=1584880&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch_internal/config/parser.py (original)
+++ qpid/dispatch/trunk/python/qpid_dispatch_internal/config/parser.py Fri Apr  4 20:06:24 2014
@@ -29,7 +29,7 @@ class Section:
     self.name           = name
     self.schema_section = schema_section
     self.values         = schema_section.check_and_default(kv_pairs)
-    self.index          = schema_section.index_of(kv_pairs)
+    self.index          = schema_section.index_of(self.values)
 
   def __repr__(self):
     return "%r" % self.values

Modified: qpid/dispatch/trunk/python/qpid_dispatch_internal/config/schema.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch_internal/config/schema.py?rev=1584880&r1=1584879&r2=1584880&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch_internal/config/schema.py (original)
+++ qpid/dispatch/trunk/python/qpid_dispatch_internal/config/schema.py Fri Apr  4 20:06:24 2014
@@ -58,7 +58,7 @@ config_schema = {
         'addr'              : (str,  0,    'M', None,  None),
         'port'              : (str,  1,    'M', None,  None),
         'label'             : (str,  None, '',  None,  None),
-        'role'              : (str,  None, '',  'normal', ['normal', 'inter-router']),
+        'role'              : (str,  None, '',  'normal', ['normal', 'inter-router', 'on-demand']),
         'sasl-mechanisms'   : (str,  None, 'M', None,  None),
         'ssl-profile'       : (str,  None, 'E', None,  None),
         'require-peer-auth' : (bool, None, '',  True,  None),

Modified: qpid/dispatch/trunk/python/qpid_dispatch_internal/router/mobile.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch_internal/router/mobile.py?rev=1584880&r1=1584879&r2=1584880&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch_internal/router/mobile.py (original)
+++ qpid/dispatch/trunk/python/qpid_dispatch_internal/router/mobile.py Fri Apr  4 20:06:24 2014
@@ -183,8 +183,8 @@ class MobileAddressEngine(object):
         bit = self.node_tracker.maskbit_for_node(_id)
         if added != None:
             for a in added:
-                self.container.router_adapter.map_destination(a, bit)
+                self.container.router_adapter.map_destination(a[0], a[1:], bit)
         if deleted != None:
             for d in deleted:
-                self.container.router_adapter.unmap_destination(d, bit)
+                self.container.router_adapter.unmap_destination(d[0], d[1:], bit)
 

Modified: qpid/dispatch/trunk/router/src/main.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/router/src/main.c?rev=1584880&r1=1584879&r2=1584880&view=diff
==============================================================================
--- qpid/dispatch/trunk/router/src/main.c (original)
+++ qpid/dispatch/trunk/router/src/main.c Fri Apr  4 20:06:24 2014
@@ -31,9 +31,15 @@ static qd_dispatch_t *dispatch;
 static const char *app_config =
     "from qpid_dispatch_internal.config.schema import config_schema\n"
     "config_schema['fixed-address'] = (False, {\n"
-    "   'prefix' : (str, 0,    'M', None, None),\n"
+    "   'prefix' : (str, 0, 'M', None, None),\n"
+    "   'phase'  : (int, 1, '', 0, None),\n"
     "   'fanout' : (str, None, '', 'multiple', ['multiple', 'single']),\n"
-    "   'bias'   : (str, None, '', 'closest',  ['closest', 'spread'])})\n";
+    "   'bias'   : (str, None, '', 'closest',  ['closest', 'spread'])})\n"
+    "config_schema['waypoint'] = (False, {\n"
+    "   'name'      : (str, 0,    'M', None, None),\n"
+    "   'in-phase'  : (int, None, 'M', None, None),\n"
+    "   'out-phase' : (int, None, 'M', None, None),\n"
+    "   'connector' : (str, None, 'M', None, None)})\n";
 
 
 /**

Modified: qpid/dispatch/trunk/src/iterator.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/iterator.c?rev=1584880&r1=1584879&r2=1584880&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/iterator.c (original)
+++ qpid/dispatch/trunk/src/iterator.c Fri Apr  4 20:06:24 2014
@@ -32,6 +32,12 @@ typedef enum {
     MODE_TO_SLASH
 } parse_mode_t;
 
+typedef enum {
+    STATE_AT_PREFIX,
+    STATE_AT_PHASE,
+    STATE_IN_ADDRESS
+} addr_state_t;
+
 typedef struct {
     qd_buffer_t   *buffer;
     unsigned char *cursor;
@@ -44,9 +50,10 @@ struct qd_field_iterator_t {
     pointer_t           pointer;
     qd_iterator_view_t  view;
     parse_mode_t        mode;
+    addr_state_t        state;
+    bool                view_prefix;
     unsigned char       prefix;
-    int                 at_prefix;
-    int                 view_prefix;
+    unsigned char       phase;
 };
 
 ALLOC_DECLARE(qd_field_iterator_t);
@@ -79,8 +86,8 @@ static void parse_address_view(qd_field_
     if (qd_field_iterator_prefix(iter, "_")) {
         if (qd_field_iterator_prefix(iter, "local/")) {
             iter->prefix      = 'L';
-            iter->at_prefix   = 1;
-            iter->view_prefix = 1;
+            iter->state       = STATE_AT_PREFIX;
+            iter->view_prefix = true;
             return;
         }
 
@@ -88,29 +95,29 @@ static void parse_address_view(qd_field_
             if (qd_field_iterator_prefix(iter, "all/") || qd_field_iterator_prefix(iter, my_area)) {
                 if (qd_field_iterator_prefix(iter, "all/") || qd_field_iterator_prefix(iter, my_router)) {
                     iter->prefix      = 'L';
-                    iter->at_prefix   = 1;
-                    iter->view_prefix = 1;
+                    iter->state       = STATE_AT_PREFIX;
+                    iter->view_prefix = true;
                     return;
                 }
 
                 iter->prefix      = 'R';
-                iter->at_prefix   = 1;
-                iter->view_prefix = 1;
+                iter->state       = STATE_AT_PREFIX;
+                iter->view_prefix = true;
                 iter->mode        = MODE_TO_SLASH;
                 return;
             }
 
             iter->prefix      = 'A';
-            iter->at_prefix   = 1;
-            iter->view_prefix = 1;
+            iter->state       = STATE_AT_PREFIX;
+            iter->view_prefix = true;
             iter->mode        = MODE_TO_SLASH;
             return;
         }
     }
 
     iter->prefix      = 'M';
-    iter->at_prefix   = 1;
-    iter->view_prefix = 1;
+    iter->state       = STATE_AT_PREFIX;
+    iter->view_prefix = true;
 }
 
 
@@ -124,15 +131,15 @@ static void parse_node_view(qd_field_ite
 
     if (qd_field_iterator_prefix(iter, my_area)) {
         iter->prefix      = 'R';
-        iter->at_prefix   = 1;
-        iter->view_prefix = 1;
+        iter->state       = STATE_AT_PREFIX;
+        iter->view_prefix = true;
         iter->mode        = MODE_TO_END;
         return;
     }
 
     iter->prefix      = 'A';
-    iter->at_prefix   = 1;
-    iter->view_prefix = 1;
+    iter->state       = STATE_AT_PREFIX;
+    iter->view_prefix = true;
     iter->mode        = MODE_TO_SLASH;
 }
 
@@ -143,8 +150,8 @@ static void view_initialize(qd_field_ite
     // The default behavior is for the view to *not* have a prefix.
     // We'll add one if it's needed later.
     //
-    iter->at_prefix   = 0;
-    iter->view_prefix = 0;
+    iter->state       = STATE_IN_ADDRESS;
+    iter->view_prefix = false;
     iter->mode        = MODE_TO_END;
 
     if (iter->view == ITER_VIEW_ALL)
@@ -272,6 +279,7 @@ qd_field_iterator_t* qd_field_iterator_s
     iter->start_pointer.buffer = 0;
     iter->start_pointer.cursor = (unsigned char*) text;
     iter->start_pointer.length = strlen(text);
+    iter->phase                = '0';
 
     qd_field_iterator_reset_view(iter, view);
 
@@ -288,6 +296,7 @@ qd_field_iterator_t* qd_field_iterator_b
     iter->start_pointer.buffer = 0;
     iter->start_pointer.cursor = (unsigned char*) text;
     iter->start_pointer.length = length;
+    iter->phase                = '0';
 
     qd_field_iterator_reset_view(iter, view);
 
@@ -304,6 +313,7 @@ qd_field_iterator_t *qd_field_iterator_b
     iter->start_pointer.buffer = buffer;
     iter->start_pointer.cursor = qd_buffer_base(buffer) + offset;
     iter->start_pointer.length = length;
+    iter->phase                = '0';
 
     qd_field_iterator_reset_view(iter, view);
 
@@ -319,8 +329,8 @@ void qd_field_iterator_free(qd_field_ite
 
 void qd_field_iterator_reset(qd_field_iterator_t *iter)
 {
-    iter->pointer   = iter->view_start_pointer;
-    iter->at_prefix = iter->view_prefix;
+    iter->pointer = iter->view_start_pointer;
+    iter->state   = iter->view_prefix ? STATE_AT_PREFIX : STATE_IN_ADDRESS;
 }
 
 
@@ -335,13 +345,24 @@ void qd_field_iterator_reset_view(qd_fie
 }
 
 
+void qd_field_iterator_set_phase(qd_field_iterator_t *iter, char phase)
+{
+    iter->phase = phase;
+}
+
+
 unsigned char qd_field_iterator_octet(qd_field_iterator_t *iter)
 {
-    if (iter->at_prefix) {
-        iter->at_prefix = 0;
+    if (iter->state == STATE_AT_PREFIX) {
+        iter->state =  iter->prefix == 'M' ? STATE_AT_PHASE : STATE_IN_ADDRESS;
         return iter->prefix;
     }
 
+    if (iter->state == STATE_AT_PHASE) {
+        iter->state = STATE_IN_ADDRESS;
+        return iter->phase;
+    }
+
     if (iter->pointer.length == 0)
         return (unsigned char) 0;
 
@@ -386,8 +407,9 @@ qd_field_iterator_t *qd_field_iterator_s
     sub->pointer              = sub->start_pointer;
     sub->view                 = iter->view;
     sub->mode                 = iter->mode;
-    sub->at_prefix            = 0;
-    sub->view_prefix          = 0;
+    sub->state                = STATE_IN_ADDRESS;
+    sub->view_prefix          = false;
+    sub->phase                = '0';
 
     return sub;
 }

Modified: qpid/dispatch/trunk/src/router_config.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_config.c?rev=1584880&r1=1584879&r2=1584880&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_config.c (original)
+++ qpid/dispatch/trunk/src/router_config.c Fri Apr  4 20:06:24 2014
@@ -24,27 +24,57 @@
 #include "dispatch_private.h"
 #include "router_private.h"
 
-static const char *CONF_ADDRESS = "fixed-address";
+static const char *CONF_ADDRESS  = "fixed-address";
+static const char *CONF_WAYPOINT = "waypoint";
 
-void qd_router_configure(qd_router_t *router)
-{
-    if (!router->qd)
-        return;
 
+static void qd_router_configure_addresses(qd_router_t *router)
+{
     int count = qd_config_item_count(router->qd, CONF_ADDRESS);
 
-    router->config_addr_count = count;
-    router->config_addrs      = NEW_ARRAY(qd_config_address_t, count);
-
     for (int idx = 0; idx < count; idx++) {
         const char *prefix = qd_config_item_value_string(router->qd, CONF_ADDRESS, idx, "prefix");
+        int         phase  = qd_config_item_value_int(router->qd,    CONF_ADDRESS, idx, "phase");
         const char *fanout = qd_config_item_value_string(router->qd, CONF_ADDRESS, idx, "fanout");
         const char *bias   = qd_config_item_value_string(router->qd, CONF_ADDRESS, idx, "bias");
-        router->config_addrs[idx].prefix = (char*) malloc(strlen(prefix) + 1);
-        if (prefix[0] == '/')
-            strcpy(router->config_addrs[idx].prefix, &prefix[1]);
-        else
-            strcpy(router->config_addrs[idx].prefix, prefix);
+
+        if (phase < 0 || phase > 9) {
+            qd_log(router->log_source, QD_LOG_ERROR, "Phase for prefix '%s' must be between 0 and 9.  Ignoring", prefix);
+            continue;
+        }
+
+        //
+        // Search for a matching prefix in the list.
+        //
+        qd_config_address_t *addr = DEQ_HEAD(router->config_addrs);
+        while (addr) {
+            if (strcmp(addr->prefix, prefix) == 0)
+                break;
+            addr = DEQ_NEXT(addr);
+        }
+
+        if (addr == 0) {
+            //
+            // Create a new prefix
+            //
+            addr = NEW(qd_config_address_t);
+            DEQ_ITEM_INIT(addr);
+            addr->prefix = (char*) malloc(strlen(prefix) + 1);
+            addr->last_phase = (char) phase + '0';
+            DEQ_INIT(addr->phases);
+            DEQ_INSERT_TAIL(router->config_addrs, addr);
+            if (prefix[0] == '/')
+                strcpy(addr->prefix, &prefix[1]);
+            else
+                strcpy(addr->prefix, prefix);
+        }
+
+        //
+        // Add the phase to the prefix
+        //
+        qd_config_phase_t *addr_phase = NEW(qd_config_phase_t);
+        DEQ_ITEM_INIT(addr_phase);
+        addr_phase->phase = (char) phase + '0';
 
         qd_address_semantics_t semantics = 0;
         if      (strcmp("multiple", fanout) == 0) semantics |= QD_FANOUT_MULTIPLE;
@@ -57,13 +87,93 @@ void qd_router_configure(qd_router_t *ro
             else if (strcmp("spread",  bias) == 0) semantics |= QD_BIAS_SPREAD;
             else
                 assert(0);
-            qd_log(router->log_source, QD_LOG_INFO, "Configured Address: prefix=%s fanout=%s bias=%s", prefix, fanout, bias);
+            qd_log(router->log_source, QD_LOG_INFO, "Configured Address: prefix=%s phase=%d fanout=%s bias=%s",
+                   prefix, phase, fanout, bias);
         } else {
             semantics |= QD_BIAS_NONE;
-            qd_log(router->log_source, QD_LOG_INFO, "Configured Address: prefix=%s fanout=%s", prefix, fanout);
+            qd_log(router->log_source, QD_LOG_INFO, "Configured Address: prefix=%s phase=%d fanout=%s",
+                   prefix, phase, fanout);
         }
 
-        router->config_addrs[idx].semantics = semantics;
+        addr_phase->semantics = semantics;
+        addr->last_phase      = addr_phase->phase;
+        DEQ_INSERT_TAIL(addr->phases, addr_phase);
     }
 }
 
+
+static void qd_router_configure_waypoints(qd_router_t *router)
+{
+    int count = qd_config_item_count(router->qd, CONF_WAYPOINT);
+
+    for (int idx = 0; idx < count; idx++) {
+        const char *name      = qd_config_item_value_string(router->qd, CONF_WAYPOINT, idx, "name");
+        int         in_phase  = qd_config_item_value_int(router->qd,    CONF_WAYPOINT, idx, "in-phase");
+        int         out_phase = qd_config_item_value_int(router->qd,    CONF_WAYPOINT, idx, "out-phase");
+        //const char *connector = qd_config_item_value_string(router->qd, CONF_WAYPOINT, idx, "connector");
+
+        if (in_phase < 0 || in_phase > 9 || out_phase < 0 || out_phase > 9) {
+            qd_log(router->log_source, QD_LOG_ERROR, "Phases for waypoint '%s' must be between 0 and 9.  Ignoring", name);
+            continue;
+        }
+
+        qd_config_waypoint_t *waypoint = NEW(qd_config_waypoint_t);
+        DEQ_ITEM_INIT(waypoint);
+        waypoint->name = (char*) malloc(strlen(name) + 1);
+        strcpy(waypoint->name, name);
+        waypoint->in_phase  = (char) in_phase  + '0';
+        waypoint->out_phase = (char) out_phase + '0';
+        waypoint->connector = 0;
+        waypoint->in_link   = 0;
+        waypoint->out_link  = 0;
+
+        //
+        // TODO - Look up connector
+        //
+
+        DEQ_INSERT_TAIL(router->config_waypoints, waypoint);
+
+        qd_log(router->log_source, QD_LOG_INFO, "Configured Waypoint: name=%s in_phase=%d out_phase=%d",
+               name, in_phase, out_phase);
+    }
+}
+
+
+void qd_router_configure(qd_router_t *router)
+{
+    if (!router->qd)
+        return;
+    qd_router_configure_addresses(router);
+    qd_router_configure_waypoints(router);
+}
+
+
+qd_address_semantics_t router_semantics_for_addr(qd_router_t *router, qd_field_iterator_t *iter,
+                                                 char in_phase, char *out_phase)
+{
+    qd_field_iterator_reset_view(iter, ITER_VIEW_NO_HOST);
+
+    qd_config_address_t *addr  = DEQ_HEAD(router->config_addrs);
+    qd_config_phase_t   *phase = 0;
+
+    while (addr) {
+        if (qd_field_iterator_prefix(iter, addr->prefix))
+            break;
+        qd_field_iterator_reset(iter);
+        addr = DEQ_NEXT(addr);
+    }
+
+    if (addr) {
+        *out_phase = in_phase == '\0' ? addr->last_phase : in_phase; 
+        phase = DEQ_HEAD(addr->phases);
+        while (phase) {
+            if (phase->phase == *out_phase)
+                break;
+            phase = DEQ_NEXT(phase);
+        }
+    }
+
+    return phase ? phase->semantics : QD_SEMANTICS_DEFAULT;
+}
+
+

Modified: qpid/dispatch/trunk/src/router_node.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_node.c?rev=1584880&r1=1584879&r2=1584880&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_node.c (original)
+++ qpid/dispatch/trunk/src/router_node.c Fri Apr  4 20:06:24 2014
@@ -955,20 +955,6 @@ static int router_incoming_link_handler(
 }
 
 
-qd_address_semantics_t router_semantics_for_addr(qd_router_t *router, qd_field_iterator_t *iter)
-{
-    qd_field_iterator_reset_view(iter, ITER_VIEW_NO_HOST);
-
-    for (int idx = 0; idx < router->config_addr_count; idx++) {
-        if (qd_field_iterator_prefix(iter, router->config_addrs[idx].prefix))
-            return router->config_addrs[idx].semantics;
-        qd_field_iterator_reset(iter);
-    }
-
-    return QD_SEMANTICS_DEFAULT;
-}
-
-
 /**
  * New Outgoing Link Handler
  */
@@ -981,6 +967,7 @@ static int router_outgoing_link_handler(
     int is_router        = qd_router_terminus_is_router(qd_link_remote_target(link));
     int propagate        = 0;
     qd_field_iterator_t    *iter = 0;
+    char                    phase = '0';
     qd_address_semantics_t  semantics;
 
     if (is_router && !qd_router_connection_is_inter_router(qd_link_connection(link))) {
@@ -1043,7 +1030,8 @@ static int router_outgoing_link_handler(
     if (is_dynamic || !iter)
         semantics = QD_FANOUT_SINGLE | QD_BIAS_CLOSEST | QD_CONGESTION_BACKPRESSURE;
     else {
-        semantics = router_semantics_for_addr(router, iter);
+        semantics = router_semantics_for_addr(router, iter, '\0', &phase);
+        qd_field_iterator_set_phase(iter, phase);
         qd_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
     }
 
@@ -1071,9 +1059,9 @@ static int router_outgoing_link_handler(
             qd_router_generate_temp_addr(router, temp_addr, 1000);
             iter = qd_field_iterator_string(temp_addr, ITER_VIEW_ADDRESS_HASH);
             pn_terminus_set_address(qd_link_source(link), temp_addr);
-            qd_log(router->log_source, QD_LOG_INFO, "Assigned temporary routable address: %s", temp_addr);
+            qd_log(router->log_source, QD_LOG_INFO, "Assigned temporary routable address=%s", temp_addr);
         } else
-            qd_log(router->log_source, QD_LOG_INFO, "Registered local address: %s", r_src);
+            qd_log(router->log_source, QD_LOG_INFO, "Registered local address=%s phase=%c", r_src, phase);
 
         qd_hash_retrieve(router->addr_hash, iter, (void**) &addr);
         if (!addr) {
@@ -1364,8 +1352,8 @@ qd_router_t *qd_router(qd_dispatch_t *qd
     router->lock               = sys_mutex();
     router->timer              = qd_timer(qd, qd_router_timer_handler, (void*) router);
     router->dtag               = 1;
-    router->config_addrs       = 0;
-    router->config_addr_count  = 0;
+    DEQ_INIT(router->config_addrs);
+    DEQ_INIT(router->config_waypoints);
 
     //
     // Configure the router from the configuration file
@@ -1444,7 +1432,7 @@ qd_address_t *qd_router_register_address
     qd_address_t        *addr;
     qd_field_iterator_t *iter;
 
-    strcpy(addr_string, global ? "M" : "L");
+    strcpy(addr_string, global ? "M0" : "L");
     strcat(addr_string, address);
     iter = qd_field_iterator_string(addr_string, ITER_VIEW_NO_HOST);
 

Modified: qpid/dispatch/trunk/src/router_private.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_private.h?rev=1584880&r1=1584879&r2=1584880&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_private.h (original)
+++ qpid/dispatch/trunk/src/router_private.h Fri Apr  4 20:06:24 2014
@@ -152,42 +152,70 @@ ALLOC_DECLARE(qd_address_t);
 DEQ_DECLARE(qd_address_t, qd_address_list_t);
 
 
-typedef struct {
-    char                   *prefix;
+typedef struct qd_config_phase_t qd_config_phase_t;
+typedef struct qd_config_address_t qd_config_address_t;
+typedef struct qd_config_waypoint_t qd_config_waypoint_t;
+
+struct qd_config_phase_t {
+    DEQ_LINKS(qd_config_phase_t);
+    char                    phase;
     qd_address_semantics_t  semantics;
-} qd_config_address_t;
+};
+
+DEQ_DECLARE(qd_config_phase_t, qd_config_phase_list_t);
+
+struct qd_config_address_t {
+    DEQ_LINKS(qd_config_address_t);
+    char                   *prefix;
+    char                    last_phase;
+    qd_config_phase_list_t  phases;
+};
+
+DEQ_DECLARE(qd_config_address_t, qd_config_address_list_t);
+
+struct qd_config_waypoint_t {
+    DEQ_LINKS(qd_config_waypoint_t);
+    char           *name;
+    char            in_phase;
+    char            out_phase;
+    qd_connector_t *connector;
+    qd_link_t      *in_link;
+    qd_link_t      *out_link;
+};
+
+DEQ_DECLARE(qd_config_waypoint_t, qd_config_waypoint_list_t);
 
 
 struct qd_router_t {
-    qd_dispatch_t          *qd;
-    qd_log_source_t        *log_source;
-    qd_router_mode_t        router_mode;
-    const char             *router_area;
-    const char             *router_id;
-    qd_node_t              *node;
-
-    qd_address_list_t       addrs;
-    qd_hash_t              *addr_hash;
-    qd_address_t           *router_addr;
-    qd_address_t           *hello_addr;
-
-    qd_router_link_list_t   links;
-    qd_router_node_list_t   routers;
-    qd_router_link_t      **out_links_by_mask_bit;
-    qd_router_node_t      **routers_by_mask_bit;
-
-    qd_bitmask_t           *neighbor_free_mask;
-    sys_mutex_t            *lock;
-    qd_timer_t             *timer;
-    uint64_t                dtag;
-
-    qd_config_address_t    *config_addrs;
-    int                     config_addr_count;
-
-    qd_agent_class_t       *class_router;
-    qd_agent_class_t       *class_link;
-    qd_agent_class_t       *class_node;
-    qd_agent_class_t       *class_address;
+    qd_dispatch_t            *qd;
+    qd_log_source_t          *log_source;
+    qd_router_mode_t          router_mode;
+    const char               *router_area;
+    const char               *router_id;
+    qd_node_t                *node;
+
+    qd_address_list_t         addrs;
+    qd_hash_t                *addr_hash;
+    qd_address_t             *router_addr;
+    qd_address_t             *hello_addr;
+
+    qd_router_link_list_t     links;
+    qd_router_node_list_t     routers;
+    qd_router_link_t        **out_links_by_mask_bit;
+    qd_router_node_t        **routers_by_mask_bit;
+
+    qd_bitmask_t             *neighbor_free_mask;
+    sys_mutex_t              *lock;
+    qd_timer_t               *timer;
+    uint64_t                  dtag;
+
+    qd_config_address_list_t  config_addrs;
+    qd_config_waypoint_list_t config_waypoints;
+
+    qd_agent_class_t         *class_router;
+    qd_agent_class_t         *class_link;
+    qd_agent_class_t         *class_node;
+    qd_agent_class_t         *class_address;
 };
 
 
@@ -203,4 +231,6 @@ void qd_router_mobile_added(qd_router_t 
 void qd_router_mobile_removed(qd_router_t *router, const char *addr);
 void qd_router_link_lost(qd_router_t *router, int link_mask_bit);
 
+qd_address_semantics_t router_semantics_for_addr(qd_router_t *router, qd_field_iterator_t *iter,
+                                                 char in_phase, char *out_phase);
 #endif

Modified: qpid/dispatch/trunk/src/router_pynode.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_pynode.c?rev=1584880&r1=1584879&r2=1584880&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_pynode.c (original)
+++ qpid/dispatch/trunk/src/router_pynode.c Fri Apr  4 20:06:24 2014
@@ -27,7 +27,6 @@
 #include "router_private.h"
 
 static qd_address_semantics_t router_addr_semantics = QD_FANOUT_SINGLE | QD_BIAS_CLOSEST | QD_CONGESTION_DROP | QD_DROP_FOR_SLOW_CONSUMERS | QD_BYPASS_VALID_ORIGINS;
-static qd_address_semantics_t default_semantics     = QD_FANOUT_SINGLE | QD_BIAS_CLOSEST | QD_CONGESTION_DROP | QD_DROP_FOR_SLOW_CONSUMERS;
 
 static qd_log_source_t *log_source = 0;
 static PyObject        *pyRouter   = 0;
@@ -360,12 +359,14 @@ static PyObject* qd_map_destination(PyOb
 {
     RouterAdapter       *adapter = (RouterAdapter*) self;
     qd_router_t         *router  = adapter->router;
+    char                 phase;
+    char                 unused;
     const char          *addr_string;
     int                  maskbit;
     qd_address_t        *addr;
     qd_field_iterator_t *iter;
 
-    if (!PyArg_ParseTuple(args, "si", &addr_string, &maskbit))
+    if (!PyArg_ParseTuple(args, "csi", &phase, &addr_string, &maskbit))
         return 0;
 
     if (maskbit >= qd_bitmask_width() || maskbit < 0) {
@@ -379,6 +380,7 @@ static PyObject* qd_map_destination(PyOb
     }
 
     iter = qd_field_iterator_string(addr_string, ITER_VIEW_ADDRESS_HASH);
+    qd_field_iterator_set_phase(iter, phase);
 
     sys_mutex_lock(router->lock);
     qd_hash_retrieve(router->addr_hash, iter, (void**) &addr);
@@ -388,8 +390,8 @@ static PyObject* qd_map_destination(PyOb
         DEQ_ITEM_INIT(addr);
         DEQ_INIT(addr->rlinks);
         DEQ_INIT(addr->rnodes);
-        addr->semantics = default_semantics; // FIXME - Add provisioned semantics here.
         qd_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
+        addr->semantics = router_semantics_for_addr(router, iter, phase, &unused);
         DEQ_ITEM_INIT(addr);
         DEQ_INSERT_TAIL(router->addrs, addr);
     }
@@ -411,11 +413,12 @@ static PyObject* qd_unmap_destination(Py
 {
     RouterAdapter *adapter = (RouterAdapter*) self;
     qd_router_t   *router  = adapter->router;
+    char           phase;
     const char    *addr_string;
     int            maskbit;
     qd_address_t  *addr;
 
-    if (!PyArg_ParseTuple(args, "si", &addr_string, &maskbit))
+    if (!PyArg_ParseTuple(args, "csi", &phase, &addr_string, &maskbit))
         return 0;
 
     if (maskbit >= qd_bitmask_width() || maskbit < 0) {
@@ -430,6 +433,7 @@ static PyObject* qd_unmap_destination(Py
 
     qd_router_node_t    *rnode = router->routers_by_mask_bit[maskbit];
     qd_field_iterator_t *iter  = qd_field_iterator_string(addr_string, ITER_VIEW_ADDRESS_HASH);
+    qd_field_iterator_set_phase(iter, phase);
 
     sys_mutex_lock(router->lock);
     qd_hash_retrieve(router->addr_hash, iter, (void**) &addr);

Modified: qpid/dispatch/trunk/tests/field_test.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/field_test.c?rev=1584880&r1=1584879&r2=1584880&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/field_test.c (original)
+++ qpid/dispatch/trunk/tests/field_test.c Fri Apr  4 20:06:24 2014
@@ -44,7 +44,7 @@ static char* test_view_global_dns(void *
         return "ITER_VIEW_NODE_SPECIFIC failed";
 
     qd_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
-    if (!qd_field_iterator_equal(iter, (unsigned char*) "Mglobal/sub"))
+    if (!qd_field_iterator_equal(iter, (unsigned char*) "M0global/sub"))
         return "ITER_VIEW_ADDRESS_HASH failed";
 
     return 0;
@@ -70,7 +70,7 @@ static char* test_view_global_non_dns(vo
         return "ITER_VIEW_NODE_SPECIFIC failed";
 
     qd_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
-    if (!qd_field_iterator_equal(iter, (unsigned char*) "Mglobal/sub"))
+    if (!qd_field_iterator_equal(iter, (unsigned char*) "M0global/sub"))
         return "ITER_VIEW_ADDRESS_HASH failed";
 
     return 0;
@@ -96,7 +96,7 @@ static char* test_view_global_no_host(vo
         return "ITER_VIEW_NODE_SPECIFIC failed";
 
     qd_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
-    if (!qd_field_iterator_equal(iter, (unsigned char*) "Mglobal/sub"))
+    if (!qd_field_iterator_equal(iter, (unsigned char*) "M0global/sub"))
         return "ITER_VIEW_ADDRESS_HASH failed";
 
     return 0;
@@ -119,12 +119,14 @@ static char* test_view_address_hash(void
     {"_topo/my-area/router/my-addr",            "Rrouter"},
     {"_topo/my-area/my-router/my-addr",         "Lmy-addr"},
     {"_topo/my-area/router",                    "Rrouter"},
+    {"amqp:/mobile",                            "M1mobile"},
     {0, 0}
     };
     int idx;
 
     for (idx = 0; cases[idx].addr; idx++) {
         qd_field_iterator_t *iter = qd_field_iterator_string(cases[idx].addr, ITER_VIEW_ADDRESS_HASH);
+        qd_field_iterator_set_phase(iter, '1');
         if (!qd_field_iterator_equal(iter, (unsigned char*) cases[idx].view)) {
             char *got = (char*) qd_field_iterator_copy(iter);
             snprintf(fail_text, FAIL_TEXT_SIZE, "Addr '%s' failed.  Expected '%s', got '%s'",

Modified: qpid/dispatch/trunk/tests/system_tests_two_routers.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/system_tests_two_routers.py?rev=1584880&r1=1584879&r2=1584880&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/system_tests_two_routers.py (original)
+++ qpid/dispatch/trunk/tests/system_tests_two_routers.py Fri Apr  4 20:06:24 2014
@@ -40,7 +40,7 @@ def wait_for_addr(messenger, addr, local
         messenger.recv()
         messenger.get(rsp)
         for item in rsp.body:
-            if item['addr'][1:] == addr and \
+            if item['addr'][2:] == addr and \
                local_count == item['subscriber-count'] and \
                remote_count == item['remote-count']:
                 done = True

Modified: qpid/dispatch/trunk/tools/qdstat.in
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/tools/qdstat.in?rev=1584880&r1=1584879&r2=1584880&view=diff
==============================================================================
--- qpid/dispatch/trunk/tools/qdstat.in (original)
+++ qpid/dispatch/trunk/tools/qdstat.in Fri Apr  4 20:06:24 2014
@@ -170,7 +170,17 @@ class BusManager:
     def _addr_text(self, addr):
         if not addr:
             return "-"
-        return addr[1:]
+        if addr[0] == 'M':
+            return addr[2:]
+        else:
+            return addr[1:]
+
+    def _addr_phase(self, addr):
+        if not addr:
+            return "-"
+        if addr[0] == 'M':
+            return addr[1]
+        return ''
 
     def displayGeneral(self):
         disp = Display(prefix="  ")
@@ -268,6 +278,7 @@ class BusManager:
         heads = []
         heads.append(Header("class"))
         heads.append(Header("address"))
+        heads.append(Header("phase"))
         heads.append(Header("in-proc", Header.Y))
         heads.append(Header("local", Header.COMMAS))
         heads.append(Header("remote", Header.COMMAS))
@@ -284,6 +295,7 @@ class BusManager:
             row = []
             row.append(self._addr_class(addr['addr']))
             row.append(self._addr_text(addr['addr']))
+            row.append(self._addr_phase(addr['addr']))
             row.append(addr['in-process'])
             row.append(addr['subscriber-count'])
             row.append(addr['remote-count'])



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org