You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2015/12/16 22:51:05 UTC
[2/3] qpid-dispatch git commit: DISPATCH-179 - WIP checkpoint. Ripped
out large sections of the old architecture. Added the general callback
functionality for non-connection-specific actions.
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ca24d67f/src/router_pynode.c
----------------------------------------------------------------------
diff --git a/src/router_pynode.c b/src/router_pynode.c
index 77ac601..3cdc98f 100644
--- a/src/router_pynode.c
+++ b/src/router_pynode.c
@@ -28,8 +28,6 @@
#include "waypoint_private.h"
#include "entity_cache.h"
-static qd_address_semantics_t router_addr_semantics = QD_FANOUT_SINGLE | QD_BIAS_CLOSEST | QD_CONGESTION_DROP | QD_DROP_FOR_SLOW_CONSUMERS | QD_BYPASS_VALID_ORIGINS;
-
static qd_log_source_t *log_source = 0;
static PyObject *pyRouter = 0;
static PyObject *pyTick = 0;
@@ -49,84 +47,12 @@ static PyObject *qd_add_router(PyObject *self, PyObject *args)
qd_router_t *router = adapter->router;
const char *address;
int router_maskbit;
- char *error = 0;
if (!PyArg_ParseTuple(args, "si", &address, &router_maskbit))
return 0;
qdr_core_add_router(router->router_core, address, router_maskbit);
- do {
- if (router_maskbit >= qd_bitmask_width() || router_maskbit < 0) {
- error = "Router bit mask out of range";
- break;
- }
-
- sys_mutex_lock(router->lock);
- if (router->routers_by_mask_bit[router_maskbit] != 0) {
- sys_mutex_unlock(router->lock);
- error = "Adding router over already existing router";
- break;
- }
-
- //
- // Hash lookup the address to ensure there isn't an existing router address.
- //
- qd_field_iterator_t *iter = qd_address_iterator_string(address, ITER_VIEW_ADDRESS_HASH);
- qd_address_t *addr;
-
- qd_hash_retrieve(router->addr_hash, iter, (void**) &addr);
- assert(addr == 0);
-
- //
- // Create an address record for this router and insert it in the hash table.
- // This record will be found whenever a "foreign" topological address to this
- // remote router is looked up.
- //
- addr = qd_address(router_addr_semantics);
- qd_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
- DEQ_INSERT_TAIL(router->addrs, addr);
- qd_entity_cache_add(QD_ROUTER_ADDRESS_TYPE, addr);
- qd_field_iterator_free(iter);
-
- //
- // Create a router-node record to represent the remote router.
- //
- qd_router_node_t *rnode = new_qd_router_node_t();
- DEQ_ITEM_INIT(rnode);
- rnode->owning_addr = addr;
- rnode->mask_bit = router_maskbit;
- rnode->next_hop = 0;
- rnode->peer_link = 0;
- rnode->ref_count = 0;
- rnode->valid_origins = qd_bitmask(0);
-
- DEQ_INSERT_TAIL(router->routers, rnode);
-
- //
- // Link the router record to the address record.
- //
- qd_router_add_node_ref_LH(&addr->rnodes, rnode);
-
- //
- // Link the router record to the router address records.
- //
- qd_router_add_node_ref_LH(&router->router_addr->rnodes, rnode);
- qd_router_add_node_ref_LH(&router->routerma_addr->rnodes, rnode);
-
- //
- // Add the router record to the mask-bit index.
- //
- router->routers_by_mask_bit[router_maskbit] = rnode;
-
- sys_mutex_unlock(router->lock);
- } while (0);
-
- if (error) {
- PyErr_SetString(PyExc_Exception, error);
- return 0;
- }
-
Py_INCREF(Py_None);
return Py_None;
}
@@ -137,69 +63,12 @@ static PyObject* qd_del_router(PyObject *self, PyObject *args)
RouterAdapter *adapter = (RouterAdapter*) self;
qd_router_t *router = adapter->router;
int router_maskbit;
- char *error = 0;
if (!PyArg_ParseTuple(args, "i", &router_maskbit))
return 0;
qdr_core_del_router(router->router_core, router_maskbit);
- do {
- if (router_maskbit >= qd_bitmask_width() || router_maskbit < 0) {
- error = "Router bit mask out of range";
- break;
- }
-
- sys_mutex_lock(router->lock);
- if (router->routers_by_mask_bit[router_maskbit] == 0) {
- sys_mutex_unlock(router->lock);
- error = "Deleting nonexistent router";
- break;
- }
-
- qd_router_node_t *rnode = router->routers_by_mask_bit[router_maskbit];
- qd_address_t *oaddr = rnode->owning_addr;
- assert(oaddr);
-
- qd_entity_cache_remove(QD_ROUTER_ADDRESS_TYPE, oaddr);
-
- //
- // Unlink the router node from the address record
- //
- qd_router_del_node_ref_LH(&oaddr->rnodes, rnode);
-
- //
- // While the router node has a non-zero reference count, look for addresses
- // to unlink the node from.
- //
- qd_address_t *addr = DEQ_HEAD(router->addrs);
- while (addr && rnode->ref_count > 0) {
- qd_router_del_node_ref_LH(&addr->rnodes, rnode);
- addr = DEQ_NEXT(addr);
- }
- assert(rnode->ref_count == 0);
-
- //
- // Free the router node and the owning address records.
- //
- qd_bitmask_free(rnode->valid_origins);
- DEQ_REMOVE(router->routers, rnode);
- free_qd_router_node_t(rnode);
-
- qd_hash_remove_by_handle(router->addr_hash, oaddr->hash_handle);
- DEQ_REMOVE(router->addrs, oaddr);
- qd_hash_handle_free(oaddr->hash_handle);
- router->routers_by_mask_bit[router_maskbit] = 0;
- free_qd_address_t(oaddr);
-
- sys_mutex_unlock(router->lock);
- } while(0);
-
- if (error) {
- PyErr_SetString(PyExc_Exception, error);
- return 0;
- }
-
Py_INCREF(Py_None);
return Py_None;
}
@@ -211,40 +80,12 @@ static PyObject* qd_set_link(PyObject *self, PyObject *args)
qd_router_t *router = adapter->router;
int router_maskbit;
int link_maskbit;
- char *error = 0;
if (!PyArg_ParseTuple(args, "ii", &router_maskbit, &link_maskbit))
return 0;
qdr_core_set_link(router->router_core, router_maskbit, link_maskbit);
- do {
- if (link_maskbit >= qd_bitmask_width() || link_maskbit < 0) {
- error = "Link bit mask out of range";
- break;
- }
-
- sys_mutex_lock(router->lock);
- if (router->out_links_by_mask_bit[link_maskbit] == 0) {
- sys_mutex_unlock(router->lock);
- error = "Adding neighbor router with invalid link reference";
- break;
- }
-
- //
- // Add the peer_link reference to the router record.
- //
- qd_router_node_t *rnode = router->routers_by_mask_bit[router_maskbit];
- rnode->peer_link = router->out_links_by_mask_bit[link_maskbit];
-
- sys_mutex_unlock(router->lock);
- } while (0);
-
- if (error) {
- PyErr_SetString(PyExc_Exception, error);
- return 0;
- }
-
Py_INCREF(Py_None);
return Py_None;
}
@@ -255,25 +96,12 @@ static PyObject* qd_remove_link(PyObject *self, PyObject *args)
RouterAdapter *adapter = (RouterAdapter*) self;
qd_router_t *router = adapter->router;
int router_maskbit;
- char *error = 0;
if (!PyArg_ParseTuple(args, "i", &router_maskbit))
return 0;
qdr_core_remove_link(router->router_core, router_maskbit);
- do {
- sys_mutex_lock(router->lock);
- qd_router_node_t *rnode = router->routers_by_mask_bit[router_maskbit];
- rnode->peer_link = 0;
- sys_mutex_unlock(router->lock);
- } while (0);
-
- if (error) {
- PyErr_SetString(PyExc_Exception, error);
- return 0;
- }
-
Py_INCREF(Py_None);
return Py_None;
}
@@ -285,49 +113,12 @@ static PyObject* qd_set_next_hop(PyObject *self, PyObject *args)
qd_router_t *router = adapter->router;
int router_maskbit;
int next_hop_maskbit;
- char *error = 0;
if (!PyArg_ParseTuple(args, "ii", &router_maskbit, &next_hop_maskbit))
return 0;
qdr_core_set_next_hop(router->router_core, router_maskbit, next_hop_maskbit);
- do {
- if (router_maskbit >= qd_bitmask_width() || router_maskbit < 0) {
- error = "Router bit mask out of range";
- break;
- }
-
- if (next_hop_maskbit >= qd_bitmask_width() || next_hop_maskbit < 0) {
- error = "Next Hop bit mask out of range";
- break;
- }
-
- sys_mutex_lock(router->lock);
- if (router->routers_by_mask_bit[router_maskbit] == 0) {
- sys_mutex_unlock(router->lock);
- error = "Router Not Found";
- break;
- }
-
- if (router->routers_by_mask_bit[next_hop_maskbit] == 0) {
- sys_mutex_unlock(router->lock);
- error = "Next Hop Not Found";
- break;
- }
-
- if (router_maskbit != next_hop_maskbit) {
- qd_router_node_t *rnode = router->routers_by_mask_bit[router_maskbit];
- rnode->next_hop = router->routers_by_mask_bit[next_hop_maskbit];
- }
- sys_mutex_unlock(router->lock);
- } while (0);
-
- if (error) {
- PyErr_SetString(PyExc_Exception, error);
- return 0;
- }
-
Py_INCREF(Py_None);
return Py_None;
}
@@ -338,37 +129,12 @@ static PyObject* qd_remove_next_hop(PyObject *self, PyObject *args)
RouterAdapter *adapter = (RouterAdapter*) self;
qd_router_t *router = adapter->router;
int router_maskbit;
- char *error = 0;
if (!PyArg_ParseTuple(args, "i", &router_maskbit))
return 0;
qdr_core_remove_next_hop(router->router_core, router_maskbit);
- do {
- if (router_maskbit >= qd_bitmask_width() || router_maskbit < 0) {
- error = "Router bit mask out of range";
- break;
- }
-
- sys_mutex_lock(router->lock);
- if (router->routers_by_mask_bit[router_maskbit] == 0) {
- sys_mutex_unlock(router->lock);
- error = "Router Not Found";
- break;
- }
-
- qd_router_node_t *rnode = router->routers_by_mask_bit[router_maskbit];
- rnode->next_hop = 0;
-
- sys_mutex_unlock(router->lock);
- } while (0);
-
- if (error) {
- PyErr_SetString(PyExc_Exception, error);
- return 0;
- }
-
Py_INCREF(Py_None);
return Py_None;
}
@@ -397,43 +163,26 @@ static PyObject* qd_set_valid_origins(PyObject *self, PyObject *args)
break;
}
- sys_mutex_lock(router->lock);
- if (router->routers_by_mask_bit[router_maskbit] == 0) {
- sys_mutex_unlock(router->lock);
- error = "Router Not Found";
- break;
- }
-
- Py_ssize_t origin_count = PyList_Size(origin_list);
- qd_router_node_t *rnode = router->routers_by_mask_bit[router_maskbit];
- qd_bitmask_t *core_bitmask = qd_bitmask(0);
- int maskbit;
+ Py_ssize_t origin_count = PyList_Size(origin_list);
+ qd_bitmask_t *core_bitmask = qd_bitmask(0);
+ int maskbit;
for (idx = 0; idx < origin_count; idx++) {
maskbit = PyInt_AS_LONG(PyList_GetItem(origin_list, idx));
-
if (maskbit >= qd_bitmask_width() || maskbit < 0) {
error = "Origin bit mask out of range";
break;
}
-
- if (router->routers_by_mask_bit[maskbit] == 0) {
- error = "Origin router Not Found";
- break;
- }
}
if (error == 0) {
- qd_bitmask_clear_all(rnode->valid_origins);
- qd_bitmask_set_bit(rnode->valid_origins, 0); // This router is a valid origin for all destinations
+ qd_bitmask_set_bit(core_bitmask, 0); // This router is a valid origin for all destinations
for (idx = 0; idx < origin_count; idx++) {
maskbit = PyInt_AS_LONG(PyList_GetItem(origin_list, idx));
- qd_bitmask_set_bit(rnode->valid_origins, maskbit);
qd_bitmask_set_bit(core_bitmask, maskbit);
}
- }
-
- sys_mutex_unlock(router->lock);
+ } else
+ qd_bitmask_free(core_bitmask);
qdr_core_set_valid_origins(router->router_core, router_maskbit, core_bitmask);
} while (0);
@@ -450,16 +199,12 @@ static PyObject* qd_set_valid_origins(PyObject *self, PyObject *args)
static PyObject* qd_map_destination(PyObject *self, PyObject *args)
{
- RouterAdapter *adapter = (RouterAdapter*) self;
- qd_router_t *router = adapter->router;
- char phase;
- char unused;
- const char *addr_string;
- int maskbit;
- qd_address_t *addr;
- qd_field_iterator_t *iter;
-
- if (!PyArg_ParseTuple(args, "csi", &phase, &addr_string, &maskbit))
+ RouterAdapter *adapter = (RouterAdapter*) self;
+ qd_router_t *router = adapter->router;
+ const char *addr_string;
+ int maskbit;
+
+ if (!PyArg_ParseTuple(args, "si", &addr_string, &maskbit))
return 0;
if (maskbit >= qd_bitmask_width() || maskbit < 0) {
@@ -467,34 +212,8 @@ static PyObject* qd_map_destination(PyObject *self, PyObject *args)
return 0;
}
- if (router->routers_by_mask_bit[maskbit] == 0) {
- PyErr_SetString(PyExc_Exception, "Router Not Found");
- return 0;
- }
-
- iter = qd_address_iterator_string(addr_string, ITER_VIEW_ALL);
-
- sys_mutex_lock(router->lock);
- qd_hash_retrieve(router->addr_hash, iter, (void**) &addr);
- if (!addr) {
- addr = qd_address(router_semantics_for_addr(router, iter, phase, &unused));
- qd_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
- DEQ_ITEM_INIT(addr);
- DEQ_INSERT_TAIL(router->addrs, addr);
- qd_entity_cache_add(QD_ROUTER_ADDRESS_TYPE, addr);
- }
- qd_field_iterator_free(iter);
-
- qd_router_node_t *rnode = router->routers_by_mask_bit[maskbit];
- qd_router_add_node_ref_LH(&addr->rnodes, rnode);
+ qdr_core_map_destination(router->router_core, maskbit, addr_string);
- //
- // If the address has an associated waypoint, notify the waypoint module of the changes.
- //
- if (addr->waypoint)
- qd_waypoint_address_updated_LH(router->qd, addr);
-
- sys_mutex_unlock(router->lock);
Py_INCREF(Py_None);
return Py_None;
}
@@ -506,7 +225,6 @@ static PyObject* qd_unmap_destination(PyObject *self, PyObject *args)
qd_router_t *router = adapter->router;
const char *addr_string;
int maskbit;
- qd_address_t *addr;
if (!PyArg_ParseTuple(args, "si", &addr_string, &maskbit))
return 0;
@@ -516,35 +234,7 @@ static PyObject* qd_unmap_destination(PyObject *self, PyObject *args)
return 0;
}
- if (router->routers_by_mask_bit[maskbit] == 0) {
- PyErr_SetString(PyExc_Exception, "Router Not Found");
- return 0;
- }
-
- qd_router_node_t *rnode = router->routers_by_mask_bit[maskbit];
- qd_field_iterator_t *iter = qd_address_iterator_string(addr_string, ITER_VIEW_ALL);
-
- sys_mutex_lock(router->lock);
- qd_hash_retrieve(router->addr_hash, iter, (void**) &addr);
- qd_field_iterator_free(iter);
-
- if (!addr) {
- PyErr_SetString(PyExc_Exception, "Address Not Found");
- sys_mutex_unlock(router->lock);
- return 0;
- }
-
- qd_router_del_node_ref_LH(&addr->rnodes, rnode);
-
- //
- // If the address has an associated waypoint, notify the waypoint module of the changes.
- //
- if (addr->waypoint)
- qd_waypoint_address_updated_LH(router->qd, addr);
-
- sys_mutex_unlock(router->lock);
-
- qd_router_check_addr(router, addr, 0);
+ qdr_core_unmap_destination(router->router_core, maskbit, addr_string);
Py_INCREF(Py_None);
return Py_None;
@@ -625,11 +315,75 @@ static PyTypeObject RouterAdapterType = {
0 /* tp_version_tag */
};
+
+static void qd_router_mobile_added(void *context, const char *address_hash)
+{
+ qd_router_t *router = (qd_router_t*) context;
+ PyObject *pArgs;
+ PyObject *pValue;
+
+ if (pyAdded && router->router_mode == QD_ROUTER_MODE_INTERIOR) {
+ qd_python_lock_state_t lock_state = qd_python_lock();
+ pArgs = PyTuple_New(1);
+ PyTuple_SetItem(pArgs, 0, PyString_FromString(address_hash));
+ pValue = PyObject_CallObject(pyAdded, pArgs);
+ qd_error_py();
+ Py_DECREF(pArgs);
+ Py_XDECREF(pValue);
+ qd_python_unlock(lock_state);
+ }
+}
+
+
+static void qd_router_mobile_removed(void *context, const char *address_hash)
+{
+ qd_router_t *router = (qd_router_t*) context;
+ PyObject *pArgs;
+ PyObject *pValue;
+
+ if (pyRemoved && router->router_mode == QD_ROUTER_MODE_INTERIOR) {
+ qd_python_lock_state_t lock_state = qd_python_lock();
+ pArgs = PyTuple_New(1);
+ PyTuple_SetItem(pArgs, 0, PyString_FromString(address_hash));
+ pValue = PyObject_CallObject(pyRemoved, pArgs);
+ qd_error_py();
+ Py_DECREF(pArgs);
+ Py_XDECREF(pValue);
+ qd_python_unlock(lock_state);
+ }
+}
+
+
+static void qd_router_link_lost(void *context, int link_mask_bit)
+{
+ qd_router_t *router = (qd_router_t*) context;
+ PyObject *pArgs;
+ PyObject *pValue;
+
+ if (pyRemoved && router->router_mode == QD_ROUTER_MODE_INTERIOR) {
+ qd_python_lock_state_t lock_state = qd_python_lock();
+ pArgs = PyTuple_New(1);
+ PyTuple_SetItem(pArgs, 0, PyInt_FromLong((long) link_mask_bit));
+ pValue = PyObject_CallObject(pyLinkLost, pArgs);
+ qd_error_py();
+ Py_DECREF(pArgs);
+ Py_XDECREF(pValue);
+ qd_python_unlock(lock_state);
+ }
+}
+
+
qd_error_t qd_router_python_setup(qd_router_t *router)
{
qd_error_clear();
log_source = qd_log_source("ROUTER");
+ qdr_core_route_table_handlers(router->router_core,
+ router,
+ qd_router_mobile_added,
+ qd_router_mobile_removed,
+ qd_router_link_lost);
+
//
// If we are not operating as an interior router, don't start the
// router module.
@@ -726,62 +480,3 @@ qd_error_t qd_pyrouter_tick(qd_router_t *router)
return err;
}
-
-void qd_router_mobile_added(qd_router_t *router, qd_field_iterator_t *iter)
-{
- PyObject *pArgs;
- PyObject *pValue;
-
- if (pyAdded && router->router_mode == QD_ROUTER_MODE_INTERIOR) {
- qd_address_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
- char *address = (char*) qd_field_iterator_copy(iter);
-
- qd_python_lock_state_t lock_state = qd_python_lock();
- pArgs = PyTuple_New(1);
- PyTuple_SetItem(pArgs, 0, PyString_FromString(address));
- pValue = PyObject_CallObject(pyAdded, pArgs);
- qd_error_py();
- Py_DECREF(pArgs);
- Py_XDECREF(pValue);
- qd_python_unlock(lock_state);
-
- free(address);
- }
-}
-
-
-void qd_router_mobile_removed(qd_router_t *router, const char *address)
-{
- PyObject *pArgs;
- PyObject *pValue;
-
- if (pyRemoved && router->router_mode == QD_ROUTER_MODE_INTERIOR) {
- qd_python_lock_state_t lock_state = qd_python_lock();
- pArgs = PyTuple_New(1);
- PyTuple_SetItem(pArgs, 0, PyString_FromString(address));
- pValue = PyObject_CallObject(pyRemoved, pArgs);
- qd_error_py();
- Py_DECREF(pArgs);
- Py_XDECREF(pValue);
- qd_python_unlock(lock_state);
- }
-}
-
-
-void qd_router_link_lost(qd_router_t *router, int link_mask_bit)
-{
- PyObject *pArgs;
- PyObject *pValue;
-
- if (pyRemoved && router->router_mode == QD_ROUTER_MODE_INTERIOR) {
- qd_python_lock_state_t lock_state = qd_python_lock();
- pArgs = PyTuple_New(1);
- PyTuple_SetItem(pArgs, 0, PyInt_FromLong((long) link_mask_bit));
- pValue = PyObject_CallObject(pyLinkLost, pArgs);
- qd_error_py();
- Py_DECREF(pArgs);
- Py_XDECREF(pValue);
- qd_python_unlock(lock_state);
- }
-}
-
http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/ca24d67f/src/waypoint.c
----------------------------------------------------------------------
diff --git a/src/waypoint.c b/src/waypoint.c
index dcf00b3..fb97311 100644
--- a/src/waypoint.c
+++ b/src/waypoint.c
@@ -95,7 +95,7 @@ static void qd_waypoint_visit_sink_LH(qd_dispatch_t *qd, qd_waypoint_t *wp)
if (DEQ_SIZE(addr->rlinks) == 1) {
qd_field_iterator_t *iter = qd_address_iterator_string(wp->address, ITER_VIEW_ADDRESS_HASH);
qd_address_iterator_set_phase(iter, wp->in_phase);
- qd_router_mobile_added(router, iter);
+ //qd_router_mobile_added(router, iter);
qd_field_iterator_free(iter);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org