You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2014/04/04 22:06:25 UTC
svn commit: r1584880 - in /qpid/dispatch/trunk: etc/ include/qpid/dispatch/
python/qpid_dispatch_internal/config/ python/qpid_dispatch_internal/router/
router/src/ src/ tests/ tools/
Author: tross
Date: Fri Apr 4 20:06:24 2014
New Revision: 1584880
URL: http://svn.apache.org/r1584880
Log:
DISPATCH-35 and DISPATCH-34 - Added basis for multi-phase transfer and on-demand connectors.
Modified:
qpid/dispatch/trunk/etc/qdrouterd.conf
qpid/dispatch/trunk/include/qpid/dispatch/iterator.h
qpid/dispatch/trunk/python/qpid_dispatch_internal/config/parser.py
qpid/dispatch/trunk/python/qpid_dispatch_internal/config/schema.py
qpid/dispatch/trunk/python/qpid_dispatch_internal/router/mobile.py
qpid/dispatch/trunk/router/src/main.c
qpid/dispatch/trunk/src/iterator.c
qpid/dispatch/trunk/src/router_config.c
qpid/dispatch/trunk/src/router_node.c
qpid/dispatch/trunk/src/router_private.h
qpid/dispatch/trunk/src/router_pynode.c
qpid/dispatch/trunk/tests/field_test.c
qpid/dispatch/trunk/tests/system_tests_two_routers.py
qpid/dispatch/trunk/tools/qdstat.in
Modified: qpid/dispatch/trunk/etc/qdrouterd.conf
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/etc/qdrouterd.conf?rev=1584880&r1=1584879&r2=1584880&view=diff
==============================================================================
--- qpid/dispatch/trunk/etc/qdrouterd.conf (original)
+++ qpid/dispatch/trunk/etc/qdrouterd.conf Fri Apr 4 20:06:24 2014
@@ -53,6 +53,20 @@ fixed-address {
}
fixed-address {
+ prefix: /queue/
+ phase: 0
+ fanout: single
+ bias: closest
+}
+
+fixed-address {
+ prefix: /queue/
+ phase: 1
+ fanout: single
+ bias: closest
+}
+
+fixed-address {
prefix: /
fanout: multiple
}
Modified: qpid/dispatch/trunk/include/qpid/dispatch/iterator.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/include/qpid/dispatch/iterator.h?rev=1584880&r1=1584879&r2=1584880&view=diff
==============================================================================
--- qpid/dispatch/trunk/include/qpid/dispatch/iterator.h (original)
+++ qpid/dispatch/trunk/include/qpid/dispatch/iterator.h Fri Apr 4 20:06:24 2014
@@ -136,6 +136,8 @@ void qd_field_iterator_reset(qd_field_it
void qd_field_iterator_reset_view(qd_field_iterator_t *iter,
qd_iterator_view_t view);
+void qd_field_iterator_set_phase(qd_field_iterator_t *iter, char phase);
+
/**
* Return the current octet in the iterator's view and step to the next.
*/
Modified: qpid/dispatch/trunk/python/qpid_dispatch_internal/config/parser.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch_internal/config/parser.py?rev=1584880&r1=1584879&r2=1584880&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch_internal/config/parser.py (original)
+++ qpid/dispatch/trunk/python/qpid_dispatch_internal/config/parser.py Fri Apr 4 20:06:24 2014
@@ -29,7 +29,7 @@ class Section:
self.name = name
self.schema_section = schema_section
self.values = schema_section.check_and_default(kv_pairs)
- self.index = schema_section.index_of(kv_pairs)
+ self.index = schema_section.index_of(self.values)
def __repr__(self):
return "%r" % self.values
Modified: qpid/dispatch/trunk/python/qpid_dispatch_internal/config/schema.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch_internal/config/schema.py?rev=1584880&r1=1584879&r2=1584880&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch_internal/config/schema.py (original)
+++ qpid/dispatch/trunk/python/qpid_dispatch_internal/config/schema.py Fri Apr 4 20:06:24 2014
@@ -58,7 +58,7 @@ config_schema = {
'addr' : (str, 0, 'M', None, None),
'port' : (str, 1, 'M', None, None),
'label' : (str, None, '', None, None),
- 'role' : (str, None, '', 'normal', ['normal', 'inter-router']),
+ 'role' : (str, None, '', 'normal', ['normal', 'inter-router', 'on-demand']),
'sasl-mechanisms' : (str, None, 'M', None, None),
'ssl-profile' : (str, None, 'E', None, None),
'require-peer-auth' : (bool, None, '', True, None),
Modified: qpid/dispatch/trunk/python/qpid_dispatch_internal/router/mobile.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch_internal/router/mobile.py?rev=1584880&r1=1584879&r2=1584880&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch_internal/router/mobile.py (original)
+++ qpid/dispatch/trunk/python/qpid_dispatch_internal/router/mobile.py Fri Apr 4 20:06:24 2014
@@ -183,8 +183,8 @@ class MobileAddressEngine(object):
bit = self.node_tracker.maskbit_for_node(_id)
if added != None:
for a in added:
- self.container.router_adapter.map_destination(a, bit)
+ self.container.router_adapter.map_destination(a[0], a[1:], bit)
if deleted != None:
for d in deleted:
- self.container.router_adapter.unmap_destination(d, bit)
+ self.container.router_adapter.unmap_destination(d[0], d[1:], bit)
Modified: qpid/dispatch/trunk/router/src/main.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/router/src/main.c?rev=1584880&r1=1584879&r2=1584880&view=diff
==============================================================================
--- qpid/dispatch/trunk/router/src/main.c (original)
+++ qpid/dispatch/trunk/router/src/main.c Fri Apr 4 20:06:24 2014
@@ -31,9 +31,15 @@ static qd_dispatch_t *dispatch;
static const char *app_config =
"from qpid_dispatch_internal.config.schema import config_schema\n"
"config_schema['fixed-address'] = (False, {\n"
- " 'prefix' : (str, 0, 'M', None, None),\n"
+ " 'prefix' : (str, 0, 'M', None, None),\n"
+ " 'phase' : (int, 1, '', 0, None),\n"
" 'fanout' : (str, None, '', 'multiple', ['multiple', 'single']),\n"
- " 'bias' : (str, None, '', 'closest', ['closest', 'spread'])})\n";
+ " 'bias' : (str, None, '', 'closest', ['closest', 'spread'])})\n"
+ "config_schema['waypoint'] = (False, {\n"
+ " 'name' : (str, 0, 'M', None, None),\n"
+ " 'in-phase' : (int, None, 'M', None, None),\n"
+ " 'out-phase' : (int, None, 'M', None, None),\n"
+ " 'connector' : (str, None, 'M', None, None)})\n";
/**
Modified: qpid/dispatch/trunk/src/iterator.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/iterator.c?rev=1584880&r1=1584879&r2=1584880&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/iterator.c (original)
+++ qpid/dispatch/trunk/src/iterator.c Fri Apr 4 20:06:24 2014
@@ -32,6 +32,12 @@ typedef enum {
MODE_TO_SLASH
} parse_mode_t;
+typedef enum {
+ STATE_AT_PREFIX,
+ STATE_AT_PHASE,
+ STATE_IN_ADDRESS
+} addr_state_t;
+
typedef struct {
qd_buffer_t *buffer;
unsigned char *cursor;
@@ -44,9 +50,10 @@ struct qd_field_iterator_t {
pointer_t pointer;
qd_iterator_view_t view;
parse_mode_t mode;
+ addr_state_t state;
+ bool view_prefix;
unsigned char prefix;
- int at_prefix;
- int view_prefix;
+ unsigned char phase;
};
ALLOC_DECLARE(qd_field_iterator_t);
@@ -79,8 +86,8 @@ static void parse_address_view(qd_field_
if (qd_field_iterator_prefix(iter, "_")) {
if (qd_field_iterator_prefix(iter, "local/")) {
iter->prefix = 'L';
- iter->at_prefix = 1;
- iter->view_prefix = 1;
+ iter->state = STATE_AT_PREFIX;
+ iter->view_prefix = true;
return;
}
@@ -88,29 +95,29 @@ static void parse_address_view(qd_field_
if (qd_field_iterator_prefix(iter, "all/") || qd_field_iterator_prefix(iter, my_area)) {
if (qd_field_iterator_prefix(iter, "all/") || qd_field_iterator_prefix(iter, my_router)) {
iter->prefix = 'L';
- iter->at_prefix = 1;
- iter->view_prefix = 1;
+ iter->state = STATE_AT_PREFIX;
+ iter->view_prefix = true;
return;
}
iter->prefix = 'R';
- iter->at_prefix = 1;
- iter->view_prefix = 1;
+ iter->state = STATE_AT_PREFIX;
+ iter->view_prefix = true;
iter->mode = MODE_TO_SLASH;
return;
}
iter->prefix = 'A';
- iter->at_prefix = 1;
- iter->view_prefix = 1;
+ iter->state = STATE_AT_PREFIX;
+ iter->view_prefix = true;
iter->mode = MODE_TO_SLASH;
return;
}
}
iter->prefix = 'M';
- iter->at_prefix = 1;
- iter->view_prefix = 1;
+ iter->state = STATE_AT_PREFIX;
+ iter->view_prefix = true;
}
@@ -124,15 +131,15 @@ static void parse_node_view(qd_field_ite
if (qd_field_iterator_prefix(iter, my_area)) {
iter->prefix = 'R';
- iter->at_prefix = 1;
- iter->view_prefix = 1;
+ iter->state = STATE_AT_PREFIX;
+ iter->view_prefix = true;
iter->mode = MODE_TO_END;
return;
}
iter->prefix = 'A';
- iter->at_prefix = 1;
- iter->view_prefix = 1;
+ iter->state = STATE_AT_PREFIX;
+ iter->view_prefix = true;
iter->mode = MODE_TO_SLASH;
}
@@ -143,8 +150,8 @@ static void view_initialize(qd_field_ite
// The default behavior is for the view to *not* have a prefix.
// We'll add one if it's needed later.
//
- iter->at_prefix = 0;
- iter->view_prefix = 0;
+ iter->state = STATE_IN_ADDRESS;
+ iter->view_prefix = false;
iter->mode = MODE_TO_END;
if (iter->view == ITER_VIEW_ALL)
@@ -272,6 +279,7 @@ qd_field_iterator_t* qd_field_iterator_s
iter->start_pointer.buffer = 0;
iter->start_pointer.cursor = (unsigned char*) text;
iter->start_pointer.length = strlen(text);
+ iter->phase = '0';
qd_field_iterator_reset_view(iter, view);
@@ -288,6 +296,7 @@ qd_field_iterator_t* qd_field_iterator_b
iter->start_pointer.buffer = 0;
iter->start_pointer.cursor = (unsigned char*) text;
iter->start_pointer.length = length;
+ iter->phase = '0';
qd_field_iterator_reset_view(iter, view);
@@ -304,6 +313,7 @@ qd_field_iterator_t *qd_field_iterator_b
iter->start_pointer.buffer = buffer;
iter->start_pointer.cursor = qd_buffer_base(buffer) + offset;
iter->start_pointer.length = length;
+ iter->phase = '0';
qd_field_iterator_reset_view(iter, view);
@@ -319,8 +329,8 @@ void qd_field_iterator_free(qd_field_ite
void qd_field_iterator_reset(qd_field_iterator_t *iter)
{
- iter->pointer = iter->view_start_pointer;
- iter->at_prefix = iter->view_prefix;
+ iter->pointer = iter->view_start_pointer;
+ iter->state = iter->view_prefix ? STATE_AT_PREFIX : STATE_IN_ADDRESS;
}
@@ -335,13 +345,24 @@ void qd_field_iterator_reset_view(qd_fie
}
+void qd_field_iterator_set_phase(qd_field_iterator_t *iter, char phase)
+{
+ iter->phase = phase;
+}
+
+
unsigned char qd_field_iterator_octet(qd_field_iterator_t *iter)
{
- if (iter->at_prefix) {
- iter->at_prefix = 0;
+ if (iter->state == STATE_AT_PREFIX) {
+ iter->state = iter->prefix == 'M' ? STATE_AT_PHASE : STATE_IN_ADDRESS;
return iter->prefix;
}
+ if (iter->state == STATE_AT_PHASE) {
+ iter->state = STATE_IN_ADDRESS;
+ return iter->phase;
+ }
+
if (iter->pointer.length == 0)
return (unsigned char) 0;
@@ -386,8 +407,9 @@ qd_field_iterator_t *qd_field_iterator_s
sub->pointer = sub->start_pointer;
sub->view = iter->view;
sub->mode = iter->mode;
- sub->at_prefix = 0;
- sub->view_prefix = 0;
+ sub->state = STATE_IN_ADDRESS;
+ sub->view_prefix = false;
+ sub->phase = '0';
return sub;
}
Modified: qpid/dispatch/trunk/src/router_config.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_config.c?rev=1584880&r1=1584879&r2=1584880&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_config.c (original)
+++ qpid/dispatch/trunk/src/router_config.c Fri Apr 4 20:06:24 2014
@@ -24,27 +24,57 @@
#include "dispatch_private.h"
#include "router_private.h"
-static const char *CONF_ADDRESS = "fixed-address";
+static const char *CONF_ADDRESS = "fixed-address";
+static const char *CONF_WAYPOINT = "waypoint";
-void qd_router_configure(qd_router_t *router)
-{
- if (!router->qd)
- return;
+static void qd_router_configure_addresses(qd_router_t *router)
+{
int count = qd_config_item_count(router->qd, CONF_ADDRESS);
- router->config_addr_count = count;
- router->config_addrs = NEW_ARRAY(qd_config_address_t, count);
-
for (int idx = 0; idx < count; idx++) {
const char *prefix = qd_config_item_value_string(router->qd, CONF_ADDRESS, idx, "prefix");
+ int phase = qd_config_item_value_int(router->qd, CONF_ADDRESS, idx, "phase");
const char *fanout = qd_config_item_value_string(router->qd, CONF_ADDRESS, idx, "fanout");
const char *bias = qd_config_item_value_string(router->qd, CONF_ADDRESS, idx, "bias");
- router->config_addrs[idx].prefix = (char*) malloc(strlen(prefix) + 1);
- if (prefix[0] == '/')
- strcpy(router->config_addrs[idx].prefix, &prefix[1]);
- else
- strcpy(router->config_addrs[idx].prefix, prefix);
+
+ if (phase < 0 || phase > 9) {
+ qd_log(router->log_source, QD_LOG_ERROR, "Phase for prefix '%s' must be between 0 and 9. Ignoring", prefix);
+ continue;
+ }
+
+ //
+ // Search for a matching prefix in the list.
+ //
+ qd_config_address_t *addr = DEQ_HEAD(router->config_addrs);
+ while (addr) {
+ if (strcmp(addr->prefix, prefix) == 0)
+ break;
+ addr = DEQ_NEXT(addr);
+ }
+
+ if (addr == 0) {
+ //
+ // Create a new prefix
+ //
+ addr = NEW(qd_config_address_t);
+ DEQ_ITEM_INIT(addr);
+ addr->prefix = (char*) malloc(strlen(prefix) + 1);
+ addr->last_phase = (char) phase + '0';
+ DEQ_INIT(addr->phases);
+ DEQ_INSERT_TAIL(router->config_addrs, addr);
+ if (prefix[0] == '/')
+ strcpy(addr->prefix, &prefix[1]);
+ else
+ strcpy(addr->prefix, prefix);
+ }
+
+ //
+ // Add the phase to the prefix
+ //
+ qd_config_phase_t *addr_phase = NEW(qd_config_phase_t);
+ DEQ_ITEM_INIT(addr_phase);
+ addr_phase->phase = (char) phase + '0';
qd_address_semantics_t semantics = 0;
if (strcmp("multiple", fanout) == 0) semantics |= QD_FANOUT_MULTIPLE;
@@ -57,13 +87,93 @@ void qd_router_configure(qd_router_t *ro
else if (strcmp("spread", bias) == 0) semantics |= QD_BIAS_SPREAD;
else
assert(0);
- qd_log(router->log_source, QD_LOG_INFO, "Configured Address: prefix=%s fanout=%s bias=%s", prefix, fanout, bias);
+ qd_log(router->log_source, QD_LOG_INFO, "Configured Address: prefix=%s phase=%d fanout=%s bias=%s",
+ prefix, phase, fanout, bias);
} else {
semantics |= QD_BIAS_NONE;
- qd_log(router->log_source, QD_LOG_INFO, "Configured Address: prefix=%s fanout=%s", prefix, fanout);
+ qd_log(router->log_source, QD_LOG_INFO, "Configured Address: prefix=%s phase=%d fanout=%s",
+ prefix, phase, fanout);
}
- router->config_addrs[idx].semantics = semantics;
+ addr_phase->semantics = semantics;
+ addr->last_phase = addr_phase->phase;
+ DEQ_INSERT_TAIL(addr->phases, addr_phase);
}
}
+
+static void qd_router_configure_waypoints(qd_router_t *router)
+{
+ int count = qd_config_item_count(router->qd, CONF_WAYPOINT);
+
+ for (int idx = 0; idx < count; idx++) {
+ const char *name = qd_config_item_value_string(router->qd, CONF_WAYPOINT, idx, "name");
+ int in_phase = qd_config_item_value_int(router->qd, CONF_WAYPOINT, idx, "in-phase");
+ int out_phase = qd_config_item_value_int(router->qd, CONF_WAYPOINT, idx, "out-phase");
+ //const char *connector = qd_config_item_value_string(router->qd, CONF_WAYPOINT, idx, "connector");
+
+ if (in_phase < 0 || in_phase > 9 || out_phase < 0 || out_phase > 9) {
+ qd_log(router->log_source, QD_LOG_ERROR, "Phases for waypoint '%s' must be between 0 and 9. Ignoring", name);
+ continue;
+ }
+
+ qd_config_waypoint_t *waypoint = NEW(qd_config_waypoint_t);
+ DEQ_ITEM_INIT(waypoint);
+ waypoint->name = (char*) malloc(strlen(name) + 1);
+ strcpy(waypoint->name, name);
+ waypoint->in_phase = (char) in_phase + '0';
+ waypoint->out_phase = (char) out_phase + '0';
+ waypoint->connector = 0;
+ waypoint->in_link = 0;
+ waypoint->out_link = 0;
+
+ //
+ // TODO - Look up connector
+ //
+
+ DEQ_INSERT_TAIL(router->config_waypoints, waypoint);
+
+ qd_log(router->log_source, QD_LOG_INFO, "Configured Waypoint: name=%s in_phase=%d out_phase=%d",
+ name, in_phase, out_phase);
+ }
+}
+
+
+void qd_router_configure(qd_router_t *router)
+{
+ if (!router->qd)
+ return;
+ qd_router_configure_addresses(router);
+ qd_router_configure_waypoints(router);
+}
+
+
+qd_address_semantics_t router_semantics_for_addr(qd_router_t *router, qd_field_iterator_t *iter,
+ char in_phase, char *out_phase)
+{
+ qd_field_iterator_reset_view(iter, ITER_VIEW_NO_HOST);
+
+ qd_config_address_t *addr = DEQ_HEAD(router->config_addrs);
+ qd_config_phase_t *phase = 0;
+
+ while (addr) {
+ if (qd_field_iterator_prefix(iter, addr->prefix))
+ break;
+ qd_field_iterator_reset(iter);
+ addr = DEQ_NEXT(addr);
+ }
+
+ if (addr) {
+ *out_phase = in_phase == '\0' ? addr->last_phase : in_phase;
+ phase = DEQ_HEAD(addr->phases);
+ while (phase) {
+ if (phase->phase == *out_phase)
+ break;
+ phase = DEQ_NEXT(phase);
+ }
+ }
+
+ return phase ? phase->semantics : QD_SEMANTICS_DEFAULT;
+}
+
+
Modified: qpid/dispatch/trunk/src/router_node.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_node.c?rev=1584880&r1=1584879&r2=1584880&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_node.c (original)
+++ qpid/dispatch/trunk/src/router_node.c Fri Apr 4 20:06:24 2014
@@ -955,20 +955,6 @@ static int router_incoming_link_handler(
}
-qd_address_semantics_t router_semantics_for_addr(qd_router_t *router, qd_field_iterator_t *iter)
-{
- qd_field_iterator_reset_view(iter, ITER_VIEW_NO_HOST);
-
- for (int idx = 0; idx < router->config_addr_count; idx++) {
- if (qd_field_iterator_prefix(iter, router->config_addrs[idx].prefix))
- return router->config_addrs[idx].semantics;
- qd_field_iterator_reset(iter);
- }
-
- return QD_SEMANTICS_DEFAULT;
-}
-
-
/**
* New Outgoing Link Handler
*/
@@ -981,6 +967,7 @@ static int router_outgoing_link_handler(
int is_router = qd_router_terminus_is_router(qd_link_remote_target(link));
int propagate = 0;
qd_field_iterator_t *iter = 0;
+ char phase = '0';
qd_address_semantics_t semantics;
if (is_router && !qd_router_connection_is_inter_router(qd_link_connection(link))) {
@@ -1043,7 +1030,8 @@ static int router_outgoing_link_handler(
if (is_dynamic || !iter)
semantics = QD_FANOUT_SINGLE | QD_BIAS_CLOSEST | QD_CONGESTION_BACKPRESSURE;
else {
- semantics = router_semantics_for_addr(router, iter);
+ semantics = router_semantics_for_addr(router, iter, '\0', &phase);
+ qd_field_iterator_set_phase(iter, phase);
qd_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
}
@@ -1071,9 +1059,9 @@ static int router_outgoing_link_handler(
qd_router_generate_temp_addr(router, temp_addr, 1000);
iter = qd_field_iterator_string(temp_addr, ITER_VIEW_ADDRESS_HASH);
pn_terminus_set_address(qd_link_source(link), temp_addr);
- qd_log(router->log_source, QD_LOG_INFO, "Assigned temporary routable address: %s", temp_addr);
+ qd_log(router->log_source, QD_LOG_INFO, "Assigned temporary routable address=%s", temp_addr);
} else
- qd_log(router->log_source, QD_LOG_INFO, "Registered local address: %s", r_src);
+ qd_log(router->log_source, QD_LOG_INFO, "Registered local address=%s phase=%c", r_src, phase);
qd_hash_retrieve(router->addr_hash, iter, (void**) &addr);
if (!addr) {
@@ -1364,8 +1352,8 @@ qd_router_t *qd_router(qd_dispatch_t *qd
router->lock = sys_mutex();
router->timer = qd_timer(qd, qd_router_timer_handler, (void*) router);
router->dtag = 1;
- router->config_addrs = 0;
- router->config_addr_count = 0;
+ DEQ_INIT(router->config_addrs);
+ DEQ_INIT(router->config_waypoints);
//
// Configure the router from the configuration file
@@ -1444,7 +1432,7 @@ qd_address_t *qd_router_register_address
qd_address_t *addr;
qd_field_iterator_t *iter;
- strcpy(addr_string, global ? "M" : "L");
+ strcpy(addr_string, global ? "M0" : "L");
strcat(addr_string, address);
iter = qd_field_iterator_string(addr_string, ITER_VIEW_NO_HOST);
Modified: qpid/dispatch/trunk/src/router_private.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_private.h?rev=1584880&r1=1584879&r2=1584880&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_private.h (original)
+++ qpid/dispatch/trunk/src/router_private.h Fri Apr 4 20:06:24 2014
@@ -152,42 +152,70 @@ ALLOC_DECLARE(qd_address_t);
DEQ_DECLARE(qd_address_t, qd_address_list_t);
-typedef struct {
- char *prefix;
+typedef struct qd_config_phase_t qd_config_phase_t;
+typedef struct qd_config_address_t qd_config_address_t;
+typedef struct qd_config_waypoint_t qd_config_waypoint_t;
+
+struct qd_config_phase_t {
+ DEQ_LINKS(qd_config_phase_t);
+ char phase;
qd_address_semantics_t semantics;
-} qd_config_address_t;
+};
+
+DEQ_DECLARE(qd_config_phase_t, qd_config_phase_list_t);
+
+struct qd_config_address_t {
+ DEQ_LINKS(qd_config_address_t);
+ char *prefix;
+ char last_phase;
+ qd_config_phase_list_t phases;
+};
+
+DEQ_DECLARE(qd_config_address_t, qd_config_address_list_t);
+
+struct qd_config_waypoint_t {
+ DEQ_LINKS(qd_config_waypoint_t);
+ char *name;
+ char in_phase;
+ char out_phase;
+ qd_connector_t *connector;
+ qd_link_t *in_link;
+ qd_link_t *out_link;
+};
+
+DEQ_DECLARE(qd_config_waypoint_t, qd_config_waypoint_list_t);
struct qd_router_t {
- qd_dispatch_t *qd;
- qd_log_source_t *log_source;
- qd_router_mode_t router_mode;
- const char *router_area;
- const char *router_id;
- qd_node_t *node;
-
- qd_address_list_t addrs;
- qd_hash_t *addr_hash;
- qd_address_t *router_addr;
- qd_address_t *hello_addr;
-
- qd_router_link_list_t links;
- qd_router_node_list_t routers;
- qd_router_link_t **out_links_by_mask_bit;
- qd_router_node_t **routers_by_mask_bit;
-
- qd_bitmask_t *neighbor_free_mask;
- sys_mutex_t *lock;
- qd_timer_t *timer;
- uint64_t dtag;
-
- qd_config_address_t *config_addrs;
- int config_addr_count;
-
- qd_agent_class_t *class_router;
- qd_agent_class_t *class_link;
- qd_agent_class_t *class_node;
- qd_agent_class_t *class_address;
+ qd_dispatch_t *qd;
+ qd_log_source_t *log_source;
+ qd_router_mode_t router_mode;
+ const char *router_area;
+ const char *router_id;
+ qd_node_t *node;
+
+ qd_address_list_t addrs;
+ qd_hash_t *addr_hash;
+ qd_address_t *router_addr;
+ qd_address_t *hello_addr;
+
+ qd_router_link_list_t links;
+ qd_router_node_list_t routers;
+ qd_router_link_t **out_links_by_mask_bit;
+ qd_router_node_t **routers_by_mask_bit;
+
+ qd_bitmask_t *neighbor_free_mask;
+ sys_mutex_t *lock;
+ qd_timer_t *timer;
+ uint64_t dtag;
+
+ qd_config_address_list_t config_addrs;
+ qd_config_waypoint_list_t config_waypoints;
+
+ qd_agent_class_t *class_router;
+ qd_agent_class_t *class_link;
+ qd_agent_class_t *class_node;
+ qd_agent_class_t *class_address;
};
@@ -203,4 +231,6 @@ void qd_router_mobile_added(qd_router_t
void qd_router_mobile_removed(qd_router_t *router, const char *addr);
void qd_router_link_lost(qd_router_t *router, int link_mask_bit);
+qd_address_semantics_t router_semantics_for_addr(qd_router_t *router, qd_field_iterator_t *iter,
+ char in_phase, char *out_phase);
#endif
Modified: qpid/dispatch/trunk/src/router_pynode.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_pynode.c?rev=1584880&r1=1584879&r2=1584880&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_pynode.c (original)
+++ qpid/dispatch/trunk/src/router_pynode.c Fri Apr 4 20:06:24 2014
@@ -27,7 +27,6 @@
#include "router_private.h"
static qd_address_semantics_t router_addr_semantics = QD_FANOUT_SINGLE | QD_BIAS_CLOSEST | QD_CONGESTION_DROP | QD_DROP_FOR_SLOW_CONSUMERS | QD_BYPASS_VALID_ORIGINS;
-static qd_address_semantics_t default_semantics = QD_FANOUT_SINGLE | QD_BIAS_CLOSEST | QD_CONGESTION_DROP | QD_DROP_FOR_SLOW_CONSUMERS;
static qd_log_source_t *log_source = 0;
static PyObject *pyRouter = 0;
@@ -360,12 +359,14 @@ static PyObject* qd_map_destination(PyOb
{
RouterAdapter *adapter = (RouterAdapter*) self;
qd_router_t *router = adapter->router;
+ char phase;
+ char unused;
const char *addr_string;
int maskbit;
qd_address_t *addr;
qd_field_iterator_t *iter;
- if (!PyArg_ParseTuple(args, "si", &addr_string, &maskbit))
+ if (!PyArg_ParseTuple(args, "csi", &phase, &addr_string, &maskbit))
return 0;
if (maskbit >= qd_bitmask_width() || maskbit < 0) {
@@ -379,6 +380,7 @@ static PyObject* qd_map_destination(PyOb
}
iter = qd_field_iterator_string(addr_string, ITER_VIEW_ADDRESS_HASH);
+ qd_field_iterator_set_phase(iter, phase);
sys_mutex_lock(router->lock);
qd_hash_retrieve(router->addr_hash, iter, (void**) &addr);
@@ -388,8 +390,8 @@ static PyObject* qd_map_destination(PyOb
DEQ_ITEM_INIT(addr);
DEQ_INIT(addr->rlinks);
DEQ_INIT(addr->rnodes);
- addr->semantics = default_semantics; // FIXME - Add provisioned semantics here.
qd_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
+ addr->semantics = router_semantics_for_addr(router, iter, phase, &unused);
DEQ_ITEM_INIT(addr);
DEQ_INSERT_TAIL(router->addrs, addr);
}
@@ -411,11 +413,12 @@ static PyObject* qd_unmap_destination(Py
{
RouterAdapter *adapter = (RouterAdapter*) self;
qd_router_t *router = adapter->router;
+ char phase;
const char *addr_string;
int maskbit;
qd_address_t *addr;
- if (!PyArg_ParseTuple(args, "si", &addr_string, &maskbit))
+ if (!PyArg_ParseTuple(args, "csi", &phase, &addr_string, &maskbit))
return 0;
if (maskbit >= qd_bitmask_width() || maskbit < 0) {
@@ -430,6 +433,7 @@ static PyObject* qd_unmap_destination(Py
qd_router_node_t *rnode = router->routers_by_mask_bit[maskbit];
qd_field_iterator_t *iter = qd_field_iterator_string(addr_string, ITER_VIEW_ADDRESS_HASH);
+ qd_field_iterator_set_phase(iter, phase);
sys_mutex_lock(router->lock);
qd_hash_retrieve(router->addr_hash, iter, (void**) &addr);
Modified: qpid/dispatch/trunk/tests/field_test.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/field_test.c?rev=1584880&r1=1584879&r2=1584880&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/field_test.c (original)
+++ qpid/dispatch/trunk/tests/field_test.c Fri Apr 4 20:06:24 2014
@@ -44,7 +44,7 @@ static char* test_view_global_dns(void *
return "ITER_VIEW_NODE_SPECIFIC failed";
qd_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
- if (!qd_field_iterator_equal(iter, (unsigned char*) "Mglobal/sub"))
+ if (!qd_field_iterator_equal(iter, (unsigned char*) "M0global/sub"))
return "ITER_VIEW_ADDRESS_HASH failed";
return 0;
@@ -70,7 +70,7 @@ static char* test_view_global_non_dns(vo
return "ITER_VIEW_NODE_SPECIFIC failed";
qd_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
- if (!qd_field_iterator_equal(iter, (unsigned char*) "Mglobal/sub"))
+ if (!qd_field_iterator_equal(iter, (unsigned char*) "M0global/sub"))
return "ITER_VIEW_ADDRESS_HASH failed";
return 0;
@@ -96,7 +96,7 @@ static char* test_view_global_no_host(vo
return "ITER_VIEW_NODE_SPECIFIC failed";
qd_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
- if (!qd_field_iterator_equal(iter, (unsigned char*) "Mglobal/sub"))
+ if (!qd_field_iterator_equal(iter, (unsigned char*) "M0global/sub"))
return "ITER_VIEW_ADDRESS_HASH failed";
return 0;
@@ -119,12 +119,14 @@ static char* test_view_address_hash(void
{"_topo/my-area/router/my-addr", "Rrouter"},
{"_topo/my-area/my-router/my-addr", "Lmy-addr"},
{"_topo/my-area/router", "Rrouter"},
+ {"amqp:/mobile", "M1mobile"},
{0, 0}
};
int idx;
for (idx = 0; cases[idx].addr; idx++) {
qd_field_iterator_t *iter = qd_field_iterator_string(cases[idx].addr, ITER_VIEW_ADDRESS_HASH);
+ qd_field_iterator_set_phase(iter, '1');
if (!qd_field_iterator_equal(iter, (unsigned char*) cases[idx].view)) {
char *got = (char*) qd_field_iterator_copy(iter);
snprintf(fail_text, FAIL_TEXT_SIZE, "Addr '%s' failed. Expected '%s', got '%s'",
Modified: qpid/dispatch/trunk/tests/system_tests_two_routers.py
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/system_tests_two_routers.py?rev=1584880&r1=1584879&r2=1584880&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/system_tests_two_routers.py (original)
+++ qpid/dispatch/trunk/tests/system_tests_two_routers.py Fri Apr 4 20:06:24 2014
@@ -40,7 +40,7 @@ def wait_for_addr(messenger, addr, local
messenger.recv()
messenger.get(rsp)
for item in rsp.body:
- if item['addr'][1:] == addr and \
+ if item['addr'][2:] == addr and \
local_count == item['subscriber-count'] and \
remote_count == item['remote-count']:
done = True
Modified: qpid/dispatch/trunk/tools/qdstat.in
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/tools/qdstat.in?rev=1584880&r1=1584879&r2=1584880&view=diff
==============================================================================
--- qpid/dispatch/trunk/tools/qdstat.in (original)
+++ qpid/dispatch/trunk/tools/qdstat.in Fri Apr 4 20:06:24 2014
@@ -170,7 +170,17 @@ class BusManager:
def _addr_text(self, addr):
if not addr:
return "-"
- return addr[1:]
+ if addr[0] == 'M':
+ return addr[2:]
+ else:
+ return addr[1:]
+
+ def _addr_phase(self, addr):
+ if not addr:
+ return "-"
+ if addr[0] == 'M':
+ return addr[1]
+ return ''
def displayGeneral(self):
disp = Display(prefix=" ")
@@ -268,6 +278,7 @@ class BusManager:
heads = []
heads.append(Header("class"))
heads.append(Header("address"))
+ heads.append(Header("phase"))
heads.append(Header("in-proc", Header.Y))
heads.append(Header("local", Header.COMMAS))
heads.append(Header("remote", Header.COMMAS))
@@ -284,6 +295,7 @@ class BusManager:
row = []
row.append(self._addr_class(addr['addr']))
row.append(self._addr_text(addr['addr']))
+ row.append(self._addr_phase(addr['addr']))
row.append(addr['in-process'])
row.append(addr['subscriber-count'])
row.append(addr['remote-count'])
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org