You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2013/11/19 20:21:32 UTC
svn commit: r1543534 [7/8] - in /qpid/dispatch/trunk: ./ etc/
include/qpid/dispatch/ python/qpiddx/router/ router/ router/src/ src/
tests/ tools/src/py/
Modified: qpid/dispatch/trunk/src/router_pynode.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/router_pynode.c?rev=1543534&r1=1543533&r2=1543534&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/router_pynode.c (original)
+++ qpid/dispatch/trunk/src/router_pynode.c Tue Nov 19 19:21:30 2013
@@ -30,16 +30,16 @@ static char *module = "router.pynode";
typedef struct {
PyObject_HEAD
- dx_router_t *router;
+ qd_router_t *router;
} RouterAdapter;
-static char *dx_add_router(dx_router_t *router, const char *address, int router_maskbit, int link_maskbit)
+static char *qd_add_router(qd_router_t *router, const char *address, int router_maskbit, int link_maskbit)
{
- if (router_maskbit >= dx_bitmask_width() || router_maskbit < 0)
+ if (router_maskbit >= qd_bitmask_width() || router_maskbit < 0)
return "Router bit mask out of range";
- if (link_maskbit >= dx_bitmask_width() || link_maskbit < -1)
+ if (link_maskbit >= qd_bitmask_width() || link_maskbit < -1)
return "Link bit mask out of range";
sys_mutex_lock(router->lock);
@@ -56,10 +56,10 @@ static char *dx_add_router(dx_router_t *
//
// Hash lookup the address to ensure there isn't an existing router address.
//
- dx_field_iterator_t *iter = dx_field_iterator_string(address, ITER_VIEW_ADDRESS_HASH);
- dx_address_t *addr;
+ qd_field_iterator_t *iter = qd_field_iterator_string(address, ITER_VIEW_ADDRESS_HASH);
+ qd_address_t *addr;
- dx_hash_retrieve(router->addr_hash, iter, (void**) &addr);
+ qd_hash_retrieve(router->addr_hash, iter, (void**) &addr);
assert(addr == 0);
//
@@ -67,38 +67,38 @@ static char *dx_add_router(dx_router_t *
// This record will be found whenever a "foreign" topological address to this
// remote router is looked up.
//
- addr = new_dx_address_t();
- memset(addr, 0, sizeof(dx_address_t));
+ addr = new_qd_address_t();
+ memset(addr, 0, sizeof(qd_address_t));
DEQ_ITEM_INIT(addr);
DEQ_INIT(addr->rlinks);
DEQ_INIT(addr->rnodes);
- dx_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
+ qd_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
DEQ_INSERT_TAIL(router->addrs, addr);
- dx_field_iterator_free(iter);
+ qd_field_iterator_free(iter);
//
// Create a router-node record to represent the remote router.
//
- dx_router_node_t *rnode = new_dx_router_node_t();
+ qd_router_node_t *rnode = new_qd_router_node_t();
DEQ_ITEM_INIT(rnode);
rnode->owning_addr = addr;
rnode->mask_bit = router_maskbit;
rnode->next_hop = 0;
rnode->peer_link = 0;
rnode->ref_count = 0;
- rnode->valid_origins = dx_bitmask(0);
+ rnode->valid_origins = qd_bitmask(0);
DEQ_INSERT_TAIL(router->routers, rnode);
//
// Link the router record to the address record.
//
- dx_router_add_node_ref_LH(&addr->rnodes, rnode);
+ qd_router_add_node_ref_LH(&addr->rnodes, rnode);
//
// Link the router record to the router address record.
//
- dx_router_add_node_ref_LH(&router->router_addr->rnodes, rnode);
+ qd_router_add_node_ref_LH(&router->router_addr->rnodes, rnode);
//
// Add the router record to the mask-bit index.
@@ -117,9 +117,9 @@ static char *dx_add_router(dx_router_t *
}
-static char *dx_del_router(dx_router_t *router, int router_maskbit)
+static char *qd_del_router(qd_router_t *router, int router_maskbit)
{
- if (router_maskbit >= dx_bitmask_width() || router_maskbit < 0)
+ if (router_maskbit >= qd_bitmask_width() || router_maskbit < 0)
return "Router bit mask out of range";
sys_mutex_lock(router->lock);
@@ -128,22 +128,22 @@ static char *dx_del_router(dx_router_t *
return "Deleting nonexistent router";
}
- dx_router_node_t *rnode = router->routers_by_mask_bit[router_maskbit];
- dx_address_t *oaddr = rnode->owning_addr;
+ qd_router_node_t *rnode = router->routers_by_mask_bit[router_maskbit];
+ qd_address_t *oaddr = rnode->owning_addr;
assert(oaddr);
//
// Unlink the router node from the address record
//
- dx_router_del_node_ref_LH(&oaddr->rnodes, rnode);
+ qd_router_del_node_ref_LH(&oaddr->rnodes, rnode);
//
// While the router node has a non-zero reference count, look for addresses
// to unlink the node from.
//
- dx_address_t *addr = DEQ_HEAD(router->addrs);
+ qd_address_t *addr = DEQ_HEAD(router->addrs);
while (addr && rnode->ref_count > 0) {
- dx_router_del_node_ref_LH(&addr->rnodes, rnode);
+ qd_router_del_node_ref_LH(&addr->rnodes, rnode);
addr = DEQ_NEXT(addr);
}
assert(rnode->ref_count == 0);
@@ -151,32 +151,32 @@ static char *dx_del_router(dx_router_t *
//
// Free the router node and the owning address records.
//
- dx_bitmask_free(rnode->valid_origins);
+ qd_bitmask_free(rnode->valid_origins);
DEQ_REMOVE(router->routers, rnode);
- free_dx_router_node_t(rnode);
+ free_qd_router_node_t(rnode);
- dx_hash_remove_by_handle(router->addr_hash, oaddr->hash_handle);
+ qd_hash_remove_by_handle(router->addr_hash, oaddr->hash_handle);
DEQ_REMOVE(router->addrs, oaddr);
- dx_hash_handle_free(oaddr->hash_handle);
+ qd_hash_handle_free(oaddr->hash_handle);
router->routers_by_mask_bit[router_maskbit] = 0;
- free_dx_address_t(oaddr);
+ free_qd_address_t(oaddr);
sys_mutex_unlock(router->lock);
return 0;
}
-static PyObject* dx_add_remote_router(PyObject *self, PyObject *args)
+static PyObject* qd_add_remote_router(PyObject *self, PyObject *args)
{
RouterAdapter *adapter = (RouterAdapter*) self;
- dx_router_t *router = adapter->router;
+ qd_router_t *router = adapter->router;
const char *address;
int router_maskbit;
if (!PyArg_ParseTuple(args, "si", &address, &router_maskbit))
return 0;
- char *error = dx_add_router(router, address, router_maskbit, -1);
+ char *error = qd_add_router(router, address, router_maskbit, -1);
if (error) {
PyErr_SetString(PyExc_Exception, error);
return 0;
@@ -187,16 +187,16 @@ static PyObject* dx_add_remote_router(Py
}
-static PyObject* dx_del_remote_router(PyObject *self, PyObject *args)
+static PyObject* qd_del_remote_router(PyObject *self, PyObject *args)
{
RouterAdapter *adapter = (RouterAdapter*) self;
- dx_router_t *router = adapter->router;
+ qd_router_t *router = adapter->router;
int router_maskbit;
if (!PyArg_ParseTuple(args, "i", &router_maskbit))
return 0;
- char *error = dx_del_router(router, router_maskbit);
+ char *error = qd_del_router(router, router_maskbit);
if (error) {
PyErr_SetString(PyExc_Exception, error);
return 0;
@@ -207,22 +207,22 @@ static PyObject* dx_del_remote_router(Py
}
-static PyObject* dx_set_next_hop(PyObject *self, PyObject *args)
+static PyObject* qd_set_next_hop(PyObject *self, PyObject *args)
{
RouterAdapter *adapter = (RouterAdapter*) self;
- dx_router_t *router = adapter->router;
+ qd_router_t *router = adapter->router;
int router_maskbit;
int next_hop_maskbit;
if (!PyArg_ParseTuple(args, "ii", &router_maskbit, &next_hop_maskbit))
return 0;
- if (router_maskbit >= dx_bitmask_width() || router_maskbit < 0) {
+ if (router_maskbit >= qd_bitmask_width() || router_maskbit < 0) {
PyErr_SetString(PyExc_Exception, "Router bit mask out of range");
return 0;
}
- if (next_hop_maskbit >= dx_bitmask_width() || next_hop_maskbit < 0) {
+ if (next_hop_maskbit >= qd_bitmask_width() || next_hop_maskbit < 0) {
PyErr_SetString(PyExc_Exception, "Next Hop bit mask out of range");
return 0;
}
@@ -238,7 +238,7 @@ static PyObject* dx_set_next_hop(PyObjec
}
if (router_maskbit != next_hop_maskbit) {
- dx_router_node_t *rnode = router->routers_by_mask_bit[router_maskbit];
+ qd_router_node_t *rnode = router->routers_by_mask_bit[router_maskbit];
rnode->next_hop = router->routers_by_mask_bit[next_hop_maskbit];
}
@@ -247,10 +247,10 @@ static PyObject* dx_set_next_hop(PyObjec
}
-static PyObject* dx_set_valid_origins(PyObject *self, PyObject *args)
+static PyObject* qd_set_valid_origins(PyObject *self, PyObject *args)
{
RouterAdapter *adapter = (RouterAdapter*) self;
- dx_router_t *router = adapter->router;
+ qd_router_t *router = adapter->router;
int router_maskbit;
PyObject *origin_list;
Py_ssize_t idx;
@@ -258,7 +258,7 @@ static PyObject* dx_set_valid_origins(Py
if (!PyArg_ParseTuple(args, "iO", &router_maskbit, &origin_list))
return 0;
- if (router_maskbit >= dx_bitmask_width() || router_maskbit < 0) {
+ if (router_maskbit >= qd_bitmask_width() || router_maskbit < 0) {
PyErr_SetString(PyExc_Exception, "Router bit mask out of range");
return 0;
}
@@ -274,13 +274,13 @@ static PyObject* dx_set_valid_origins(Py
}
Py_ssize_t origin_count = PyList_Size(origin_list);
- dx_router_node_t *rnode = router->routers_by_mask_bit[router_maskbit];
+ qd_router_node_t *rnode = router->routers_by_mask_bit[router_maskbit];
int maskbit;
for (idx = 0; idx < origin_count; idx++) {
maskbit = PyInt_AS_LONG(PyList_GetItem(origin_list, idx));
- if (maskbit >= dx_bitmask_width() || maskbit < 0) {
+ if (maskbit >= qd_bitmask_width() || maskbit < 0) {
PyErr_SetString(PyExc_Exception, "Origin bit mask out of range");
return 0;
}
@@ -291,11 +291,11 @@ static PyObject* dx_set_valid_origins(Py
}
}
- dx_bitmask_clear_all(rnode->valid_origins);
- dx_bitmask_set_bit(rnode->valid_origins, 0); // This router is a valid origin for all destinations
+ qd_bitmask_clear_all(rnode->valid_origins);
+ qd_bitmask_set_bit(rnode->valid_origins, 0); // This router is a valid origin for all destinations
for (idx = 0; idx < origin_count; idx++) {
maskbit = PyInt_AS_LONG(PyList_GetItem(origin_list, idx));
- dx_bitmask_set_bit(rnode->valid_origins, maskbit);
+ qd_bitmask_set_bit(rnode->valid_origins, maskbit);
}
Py_INCREF(Py_None);
@@ -303,10 +303,10 @@ static PyObject* dx_set_valid_origins(Py
}
-static PyObject* dx_add_neighbor_router(PyObject *self, PyObject *args)
+static PyObject* qd_add_neighbor_router(PyObject *self, PyObject *args)
{
RouterAdapter *adapter = (RouterAdapter*) self;
- dx_router_t *router = adapter->router;
+ qd_router_t *router = adapter->router;
const char *address;
int router_maskbit;
int link_maskbit;
@@ -314,7 +314,7 @@ static PyObject* dx_add_neighbor_router(
if (!PyArg_ParseTuple(args, "sii", &address, &router_maskbit, &link_maskbit))
return 0;
- char *error = dx_add_router(router, address, router_maskbit, link_maskbit);
+ char *error = qd_add_router(router, address, router_maskbit, link_maskbit);
if (error) {
PyErr_SetString(PyExc_Exception, error);
return 0;
@@ -325,16 +325,16 @@ static PyObject* dx_add_neighbor_router(
}
-static PyObject* dx_del_neighbor_router(PyObject *self, PyObject *args)
+static PyObject* qd_del_neighbor_router(PyObject *self, PyObject *args)
{
RouterAdapter *adapter = (RouterAdapter*) self;
- dx_router_t *router = adapter->router;
+ qd_router_t *router = adapter->router;
int router_maskbit;
if (!PyArg_ParseTuple(args, "i", &router_maskbit))
return 0;
- char *error = dx_del_router(router, router_maskbit);
+ char *error = qd_del_router(router, router_maskbit);
if (error) {
PyErr_SetString(PyExc_Exception, error);
return 0;
@@ -345,19 +345,19 @@ static PyObject* dx_del_neighbor_router(
}
-static PyObject* dx_map_destination(PyObject *self, PyObject *args)
+static PyObject* qd_map_destination(PyObject *self, PyObject *args)
{
RouterAdapter *adapter = (RouterAdapter*) self;
- dx_router_t *router = adapter->router;
+ qd_router_t *router = adapter->router;
const char *addr_string;
int maskbit;
- dx_address_t *addr;
- dx_field_iterator_t *iter;
+ qd_address_t *addr;
+ qd_field_iterator_t *iter;
if (!PyArg_ParseTuple(args, "si", &addr_string, &maskbit))
return 0;
- if (maskbit >= dx_bitmask_width() || maskbit < 0) {
+ if (maskbit >= qd_bitmask_width() || maskbit < 0) {
PyErr_SetString(PyExc_Exception, "Router bit mask out of range");
return 0;
}
@@ -367,46 +367,46 @@ static PyObject* dx_map_destination(PyOb
return 0;
}
- iter = dx_field_iterator_string(addr_string, ITER_VIEW_ADDRESS_HASH);
+ iter = qd_field_iterator_string(addr_string, ITER_VIEW_ADDRESS_HASH);
sys_mutex_lock(router->lock);
- dx_hash_retrieve(router->addr_hash, iter, (void**) &addr);
+ qd_hash_retrieve(router->addr_hash, iter, (void**) &addr);
if (!addr) {
- addr = new_dx_address_t();
- memset(addr, 0, sizeof(dx_address_t));
+ addr = new_qd_address_t();
+ memset(addr, 0, sizeof(qd_address_t));
DEQ_ITEM_INIT(addr);
DEQ_INIT(addr->rlinks);
DEQ_INIT(addr->rnodes);
- dx_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
+ qd_hash_insert(router->addr_hash, iter, addr, &addr->hash_handle);
DEQ_ITEM_INIT(addr);
DEQ_INSERT_TAIL(router->addrs, addr);
}
- dx_field_iterator_free(iter);
+ qd_field_iterator_free(iter);
- dx_router_node_t *rnode = router->routers_by_mask_bit[maskbit];
- dx_router_add_node_ref_LH(&addr->rnodes, rnode);
+ qd_router_node_t *rnode = router->routers_by_mask_bit[maskbit];
+ qd_router_add_node_ref_LH(&addr->rnodes, rnode);
sys_mutex_unlock(router->lock);
- dx_log(module, LOG_DEBUG, "Remote Destination '%s' Mapped to router %d", addr_string, maskbit);
+ qd_log(module, LOG_DEBUG, "Remote Destination '%s' Mapped to router %d", addr_string, maskbit);
Py_INCREF(Py_None);
return Py_None;
}
-static PyObject* dx_unmap_destination(PyObject *self, PyObject *args)
+static PyObject* qd_unmap_destination(PyObject *self, PyObject *args)
{
RouterAdapter *adapter = (RouterAdapter*) self;
- dx_router_t *router = adapter->router;
+ qd_router_t *router = adapter->router;
const char *addr_string;
int maskbit;
- dx_address_t *addr;
+ qd_address_t *addr;
if (!PyArg_ParseTuple(args, "si", &addr_string, &maskbit))
return 0;
- if (maskbit >= dx_bitmask_width() || maskbit < 0) {
+ if (maskbit >= qd_bitmask_width() || maskbit < 0) {
PyErr_SetString(PyExc_Exception, "Router bit mask out of range");
return 0;
}
@@ -416,12 +416,12 @@ static PyObject* dx_unmap_destination(Py
return 0;
}
- dx_router_node_t *rnode = router->routers_by_mask_bit[maskbit];
- dx_field_iterator_t *iter = dx_field_iterator_string(addr_string, ITER_VIEW_ADDRESS_HASH);
+ qd_router_node_t *rnode = router->routers_by_mask_bit[maskbit];
+ qd_field_iterator_t *iter = qd_field_iterator_string(addr_string, ITER_VIEW_ADDRESS_HASH);
sys_mutex_lock(router->lock);
- dx_hash_retrieve(router->addr_hash, iter, (void**) &addr);
- dx_field_iterator_free(iter);
+ qd_hash_retrieve(router->addr_hash, iter, (void**) &addr);
+ qd_field_iterator_free(iter);
if (!addr) {
PyErr_SetString(PyExc_Exception, "Address Not Found");
@@ -429,12 +429,12 @@ static PyObject* dx_unmap_destination(Py
return 0;
}
- dx_router_del_node_ref_LH(&addr->rnodes, rnode);
+ qd_router_del_node_ref_LH(&addr->rnodes, rnode);
sys_mutex_unlock(router->lock);
- dx_router_check_addr(router, addr, 0);
+ qd_router_check_addr(router, addr, 0);
- dx_log(module, LOG_DEBUG, "Remote Destination '%s' Unmapped from router %d", addr_string, maskbit);
+ qd_log(module, LOG_DEBUG, "Remote Destination '%s' Unmapped from router %d", addr_string, maskbit);
Py_INCREF(Py_None);
return Py_None;
@@ -442,14 +442,14 @@ static PyObject* dx_unmap_destination(Py
static PyMethodDef RouterAdapter_methods[] = {
- {"add_remote_router", dx_add_remote_router, METH_VARARGS, "A new remote/reachable router has been discovered"},
- {"del_remote_router", dx_del_remote_router, METH_VARARGS, "We've lost reachability to a remote router"},
- {"set_next_hop", dx_set_next_hop, METH_VARARGS, "Set the next hop for a remote router"},
- {"set_valid_origins", dx_set_valid_origins, METH_VARARGS, "Set the valid origins for a remote router"},
- {"add_neighbor_router", dx_add_neighbor_router, METH_VARARGS, "A new neighbor router has been discovered"},
- {"del_neighbor_router", dx_del_neighbor_router, METH_VARARGS, "We've lost reachability to a neighbor router"},
- {"map_destination", dx_map_destination, METH_VARARGS, "Add a newly discovered destination mapping"},
- {"unmap_destination", dx_unmap_destination, METH_VARARGS, "Delete a destination mapping"},
+ {"add_remote_router", qd_add_remote_router, METH_VARARGS, "A new remote/reachable router has been discovered"},
+ {"del_remote_router", qd_del_remote_router, METH_VARARGS, "We've lost reachability to a remote router"},
+ {"set_next_hop", qd_set_next_hop, METH_VARARGS, "Set the next hop for a remote router"},
+ {"set_valid_origins", qd_set_valid_origins, METH_VARARGS, "Set the valid origins for a remote router"},
+ {"add_neighbor_router", qd_add_neighbor_router, METH_VARARGS, "A new neighbor router has been discovered"},
+ {"del_neighbor_router", qd_del_neighbor_router, METH_VARARGS, "We've lost reachability to a neighbor router"},
+ {"map_destination", qd_map_destination, METH_VARARGS, "Add a newly discovered destination mapping"},
+ {"unmap_destination", qd_unmap_destination, METH_VARARGS, "Delete a destination mapping"},
{0, 0, 0, 0}
};
@@ -505,21 +505,21 @@ static PyTypeObject RouterAdapterType =
};
-void dx_router_python_setup(dx_router_t *router)
+void qd_router_python_setup(qd_router_t *router)
{
//
// If we are not operating as an interior router, don't start the
// router module.
//
- if (router->router_mode != DX_ROUTER_MODE_INTERIOR)
+ if (router->router_mode != QD_ROUTER_MODE_INTERIOR)
return;
- PyObject *pDispatchModule = dx_python_module();
+ PyObject *pDispatchModule = qd_python_module();
RouterAdapterType.tp_new = PyType_GenericNew;
if (PyType_Ready(&RouterAdapterType) < 0) {
PyErr_Print();
- dx_log(module, LOG_CRITICAL, "Unable to initialize the Python Router Adapter");
+ qd_log(module, LOG_CRITICAL, "Unable to initialize the Python Router Adapter");
return;
}
@@ -538,17 +538,17 @@ void dx_router_python_setup(dx_router_t
PyObject* pClass;
PyObject* pArgs;
- pName = PyString_FromString("qpid.dispatch.router");
+ pName = PyString_FromString("qpiddx.router");
pModule = PyImport_Import(pName);
Py_DECREF(pName);
if (!pModule) {
- dx_log(module, LOG_CRITICAL, "Can't Locate 'router' Python module");
+ qd_log(module, LOG_CRITICAL, "Can't Locate 'router' Python module");
return;
}
pClass = PyObject_GetAttrString(pModule, "RouterEngine");
if (!pClass || !PyClass_Check(pClass)) {
- dx_log(module, LOG_CRITICAL, "Can't Locate 'RouterEngine' class in the 'router' module");
+ qd_log(module, LOG_CRITICAL, "Can't Locate 'RouterEngine' class in the 'router' module");
return;
}
@@ -575,7 +575,7 @@ void dx_router_python_setup(dx_router_t
PyTuple_SetItem(pArgs, 2, pArea);
// arg 3: max_routers
- pMaxRouters = PyInt_FromLong((long) dx_bitmask_width());
+ pMaxRouters = PyInt_FromLong((long) qd_bitmask_width());
PyTuple_SetItem(pArgs, 3, pMaxRouters);
//
@@ -587,37 +587,37 @@ void dx_router_python_setup(dx_router_t
if (!router->pyRouter) {
PyErr_Print();
- dx_log(module, LOG_CRITICAL, "'RouterEngine' class cannot be instantiated");
+ qd_log(module, LOG_CRITICAL, "'RouterEngine' class cannot be instantiated");
return;
}
router->pyTick = PyObject_GetAttrString(router->pyRouter, "handleTimerTick");
if (!router->pyTick || !PyCallable_Check(router->pyTick)) {
- dx_log(module, LOG_CRITICAL, "'RouterEngine' class has no handleTimerTick method");
+ qd_log(module, LOG_CRITICAL, "'RouterEngine' class has no handleTimerTick method");
return;
}
router->pyAdded = PyObject_GetAttrString(router->pyRouter, "addressAdded");
if (!router->pyAdded || !PyCallable_Check(router->pyAdded)) {
- dx_log(module, LOG_CRITICAL, "'RouterEngine' class has no addressAdded method");
+ qd_log(module, LOG_CRITICAL, "'RouterEngine' class has no addressAdded method");
return;
}
router->pyRemoved = PyObject_GetAttrString(router->pyRouter, "addressRemoved");
if (!router->pyRemoved || !PyCallable_Check(router->pyRemoved)) {
- dx_log(module, LOG_CRITICAL, "'RouterEngine' class has no addressRemoved method");
+ qd_log(module, LOG_CRITICAL, "'RouterEngine' class has no addressRemoved method");
return;
}
}
-void dx_pyrouter_tick(dx_router_t *router)
+void qd_pyrouter_tick(qd_router_t *router)
{
PyObject *pArgs;
PyObject *pValue;
- if (router->pyTick && router->router_mode == DX_ROUTER_MODE_INTERIOR) {
- dx_python_lock();
+ if (router->pyTick && router->router_mode == QD_ROUTER_MODE_INTERIOR) {
+ qd_python_lock();
pArgs = PyTuple_New(0);
pValue = PyObject_CallObject(router->pyTick, pArgs);
if (PyErr_Occurred()) {
@@ -627,21 +627,21 @@ void dx_pyrouter_tick(dx_router_t *route
if (pValue) {
Py_DECREF(pValue);
}
- dx_python_unlock();
+ qd_python_unlock();
}
}
-void dx_router_mobile_added(dx_router_t *router, dx_field_iterator_t *iter)
+void qd_router_mobile_added(qd_router_t *router, qd_field_iterator_t *iter)
{
PyObject *pArgs;
PyObject *pValue;
- if (router->pyAdded && router->router_mode == DX_ROUTER_MODE_INTERIOR) {
- dx_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
- char *address = (char*) dx_field_iterator_copy(iter);
+ if (router->pyAdded && router->router_mode == QD_ROUTER_MODE_INTERIOR) {
+ qd_field_iterator_reset_view(iter, ITER_VIEW_ADDRESS_HASH);
+ char *address = (char*) qd_field_iterator_copy(iter);
- dx_python_lock();
+ qd_python_lock();
pArgs = PyTuple_New(1);
PyTuple_SetItem(pArgs, 0, PyString_FromString(address));
pValue = PyObject_CallObject(router->pyAdded, pArgs);
@@ -652,20 +652,20 @@ void dx_router_mobile_added(dx_router_t
if (pValue) {
Py_DECREF(pValue);
}
- dx_python_unlock();
+ qd_python_unlock();
free(address);
}
}
-void dx_router_mobile_removed(dx_router_t *router, const char *address)
+void qd_router_mobile_removed(qd_router_t *router, const char *address)
{
PyObject *pArgs;
PyObject *pValue;
- if (router->pyRemoved && router->router_mode == DX_ROUTER_MODE_INTERIOR) {
- dx_python_lock();
+ if (router->pyRemoved && router->router_mode == QD_ROUTER_MODE_INTERIOR) {
+ qd_python_lock();
pArgs = PyTuple_New(1);
PyTuple_SetItem(pArgs, 0, PyString_FromString(address));
pValue = PyObject_CallObject(router->pyRemoved, pArgs);
@@ -676,7 +676,7 @@ void dx_router_mobile_removed(dx_router_
if (pValue) {
Py_DECREF(pValue);
}
- dx_python_unlock();
+ qd_python_unlock();
}
}
Modified: qpid/dispatch/trunk/src/server.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/server.c?rev=1543534&r1=1543533&r2=1543534&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/server.c (original)
+++ qpid/dispatch/trunk/src/server.c Tue Nov 19 19:21:30 2013
@@ -31,60 +31,60 @@
#include <string.h>
static char *module="SERVER";
-static __thread dx_server_t *thread_server = 0;
+static __thread qd_server_t *thread_server = 0;
-typedef struct dx_thread_t {
- dx_server_t *dx_server;
+typedef struct qd_thread_t {
+ qd_server_t *qd_server;
int thread_id;
volatile int running;
volatile int canceled;
int using_thread;
sys_thread_t *thread;
-} dx_thread_t;
+} qd_thread_t;
-struct dx_server_t {
+struct qd_server_t {
int thread_count;
const char *container_name;
pn_driver_t *driver;
- dx_thread_start_cb_t start_handler;
- dx_conn_handler_cb_t conn_handler;
- dx_user_fd_handler_cb_t ufd_handler;
+ qd_thread_start_cb_t start_handler;
+ qd_conn_handler_cb_t conn_handler;
+ qd_user_fd_handler_cb_t ufd_handler;
void *start_context;
void *conn_handler_context;
sys_cond_t *cond;
sys_mutex_t *lock;
- dx_thread_t **threads;
+ qd_thread_t **threads;
work_queue_t *work_queue;
- dx_timer_list_t pending_timers;
+ qd_timer_list_t pending_timers;
bool a_thread_is_waiting;
int threads_active;
int pause_requests;
int threads_paused;
int pause_next_sequence;
int pause_now_serving;
- dx_signal_handler_cb_t signal_handler;
+ qd_signal_handler_cb_t signal_handler;
void *signal_context;
int pending_signal;
- dx_connection_list_t connections;
+ qd_connection_list_t connections;
};
-ALLOC_DEFINE(dx_listener_t);
-ALLOC_DEFINE(dx_connector_t);
-ALLOC_DEFINE(dx_connection_t);
-ALLOC_DEFINE(dx_user_fd_t);
+ALLOC_DEFINE(qd_listener_t);
+ALLOC_DEFINE(qd_connector_t);
+ALLOC_DEFINE(qd_connection_t);
+ALLOC_DEFINE(qd_user_fd_t);
-static dx_thread_t *thread(dx_server_t *dx_server, int id)
+static qd_thread_t *thread(qd_server_t *qd_server, int id)
{
- dx_thread_t *thread = NEW(dx_thread_t);
+ qd_thread_t *thread = NEW(qd_thread_t);
if (!thread)
return 0;
- thread->dx_server = dx_server;
+ thread->qd_server = qd_server;
thread->thread_id = id;
thread->running = 0;
thread->canceled = 0;
@@ -94,23 +94,23 @@ static dx_thread_t *thread(dx_server_t *
}
-static void thread_process_listeners(dx_server_t *dx_server)
+static void thread_process_listeners(qd_server_t *qd_server)
{
- pn_driver_t *driver = dx_server->driver;
+ pn_driver_t *driver = qd_server->driver;
pn_listener_t *listener = pn_driver_listener(driver);
pn_connector_t *cxtr;
- dx_connection_t *ctx;
+ qd_connection_t *ctx;
while (listener) {
cxtr = pn_listener_accept(listener);
- dx_log(module, LOG_TRACE, "Accepting Connection from %s", pn_connector_name(cxtr));
- ctx = new_dx_connection_t();
+ qd_log(module, LOG_TRACE, "Accepting Connection from %s", pn_connector_name(cxtr));
+ ctx = new_qd_connection_t();
DEQ_ITEM_INIT(ctx);
ctx->state = CONN_STATE_OPENING;
ctx->owner_thread = CONTEXT_NO_OWNER;
ctx->enqueued = 0;
ctx->pn_cxtr = cxtr;
- ctx->listener = (dx_listener_t*) pn_listener_context(listener);
+ ctx->listener = (qd_listener_t*) pn_listener_context(listener);
ctx->connector = 0;
ctx->context = ctx->listener->context;
ctx->user_context = 0;
@@ -118,20 +118,20 @@ static void thread_process_listeners(dx_
ctx->ufd = 0;
pn_connection_t *conn = pn_connection();
- pn_connection_set_container(conn, dx_server->container_name);
+ pn_connection_set_container(conn, qd_server->container_name);
pn_connector_set_connection(cxtr, conn);
pn_connection_set_context(conn, ctx);
ctx->pn_conn = conn;
- dx_log(module, LOG_DEBUG, "added listener connection");
- // dx_server->lock is already locked
- DEQ_INSERT_TAIL(dx_server->connections, ctx);
+ qd_log(module, LOG_DEBUG, "added listener connection");
+ // qd_server->lock is already locked
+ DEQ_INSERT_TAIL(qd_server->connections, ctx);
//
// Get a pointer to the transport so we can insert security components into it
//
pn_transport_t *tport = pn_connector_transport(cxtr);
- const dx_server_config_t *config = ctx->listener->config;
+ const qd_server_config_t *config = ctx->listener->config;
//
// Set up SSL if appropriate
@@ -167,41 +167,41 @@ static void thread_process_listeners(dx_
}
-static void handle_signals_LH(dx_server_t *dx_server)
+static void handle_signals_LH(qd_server_t *qd_server)
{
- int signum = dx_server->pending_signal;
+ int signum = qd_server->pending_signal;
if (signum) {
- dx_server->pending_signal = 0;
- if (dx_server->signal_handler) {
- sys_mutex_unlock(dx_server->lock);
- dx_server->signal_handler(dx_server->signal_context, signum);
- sys_mutex_lock(dx_server->lock);
+ qd_server->pending_signal = 0;
+ if (qd_server->signal_handler) {
+ sys_mutex_unlock(qd_server->lock);
+ qd_server->signal_handler(qd_server->signal_context, signum);
+ sys_mutex_lock(qd_server->lock);
}
}
}
-static void block_if_paused_LH(dx_server_t *dx_server)
+static void block_if_paused_LH(qd_server_t *qd_server)
{
- if (dx_server->pause_requests > 0) {
- dx_server->threads_paused++;
- sys_cond_signal_all(dx_server->cond);
- while (dx_server->pause_requests > 0)
- sys_cond_wait(dx_server->cond, dx_server->lock);
- dx_server->threads_paused--;
+ if (qd_server->pause_requests > 0) {
+ qd_server->threads_paused++;
+ sys_cond_signal_all(qd_server->cond);
+ while (qd_server->pause_requests > 0)
+ sys_cond_wait(qd_server->cond, qd_server->lock);
+ qd_server->threads_paused--;
}
}
-static int process_connector(dx_server_t *dx_server, pn_connector_t *cxtr)
+static int process_connector(qd_server_t *qd_server, pn_connector_t *cxtr)
{
- dx_connection_t *ctx = pn_connector_context(cxtr);
+ qd_connection_t *ctx = pn_connector_context(cxtr);
int events = 0;
int passes = 0;
if (ctx->state == CONN_STATE_USER) {
- dx_server->ufd_handler(ctx->ufd->context, ctx->ufd);
+ qd_server->ufd_handler(ctx->ufd->context, ctx->ufd);
return 1;
}
@@ -225,13 +225,13 @@ static int process_connector(dx_server_t
}
pn_connection_t *conn = pn_connection();
- pn_connection_set_container(conn, dx_server->container_name);
+ pn_connection_set_container(conn, qd_server->container_name);
pn_connector_set_connection(cxtr, conn);
pn_connection_set_context(conn, ctx);
ctx->pn_conn = conn;
pn_transport_t *tport = pn_connector_transport(cxtr);
- const dx_server_config_t *config = ctx->connector->config;
+ const qd_server_config_t *config = ctx->connector->config;
//
// Set up SSL if appropriate
@@ -272,41 +272,41 @@ static int process_connector(dx_server_t
if (pn_sasl_outcome(sasl) == PN_SASL_OK) {
ctx->state = CONN_STATE_OPERATIONAL;
- dx_conn_event_t ce = DX_CONN_EVENT_PROCESS; // Initialize to keep the compiler happy
+ qd_conn_event_t ce = QD_CONN_EVENT_PROCESS; // Initialize to keep the compiler happy
if (ctx->listener) {
- ce = DX_CONN_EVENT_LISTENER_OPEN;
+ ce = QD_CONN_EVENT_LISTENER_OPEN;
} else if (ctx->connector) {
- ce = DX_CONN_EVENT_CONNECTOR_OPEN;
+ ce = QD_CONN_EVENT_CONNECTOR_OPEN;
ctx->connector->delay = 0;
} else
assert(0);
- dx_server->conn_handler(dx_server->conn_handler_context,
- ctx->context, ce, (dx_connection_t*) pn_connector_context(cxtr));
+ qd_server->conn_handler(qd_server->conn_handler_context,
+ ctx->context, ce, (qd_connection_t*) pn_connector_context(cxtr));
events = 1;
break;
}
else if (pn_sasl_outcome(sasl) != PN_SASL_NONE) {
ctx->state = CONN_STATE_FAILED;
if (ctx->connector) {
- const dx_server_config_t *config = ctx->connector->config;
- dx_log(module, LOG_TRACE, "Connection to %s:%s failed", config->host, config->port);
+ const qd_server_config_t *config = ctx->connector->config;
+ qd_log(module, LOG_TRACE, "Connection to %s:%s failed", config->host, config->port);
}
}
}
case CONN_STATE_OPERATIONAL:
if (pn_connector_closed(cxtr)) {
- dx_server->conn_handler(dx_server->conn_handler_context, ctx->context,
- DX_CONN_EVENT_CLOSE,
- (dx_connection_t*) pn_connector_context(cxtr));
+ qd_server->conn_handler(qd_server->conn_handler_context, ctx->context,
+ QD_CONN_EVENT_CLOSE,
+ (qd_connection_t*) pn_connector_context(cxtr));
events = 0;
}
else
- events = dx_server->conn_handler(dx_server->conn_handler_context, ctx->context,
- DX_CONN_EVENT_PROCESS,
- (dx_connection_t*) pn_connector_context(cxtr));
+ events = qd_server->conn_handler(qd_server->conn_handler_context, ctx->context,
+ QD_CONN_EVENT_PROCESS,
+ (qd_connection_t*) pn_connector_context(cxtr));
break;
default:
@@ -330,18 +330,18 @@ void pn_driver_wait_3(pn_driver_t *d);
static void *thread_run(void *arg)
{
- dx_thread_t *thread = (dx_thread_t*) arg;
- dx_server_t *dx_server = thread->dx_server;
+ qd_thread_t *thread = (qd_thread_t*) arg;
+ qd_server_t *qd_server = thread->qd_server;
pn_connector_t *work;
pn_connection_t *conn;
- dx_connection_t *ctx;
+ qd_connection_t *ctx;
int error;
int poll_result;
if (!thread)
return 0;
- thread_server = dx_server;
+ thread_server = qd_server;
thread->running = 1;
if (thread->canceled)
@@ -351,81 +351,81 @@ static void *thread_run(void *arg)
// Invoke the start handler if the application supplied one.
// This handler can be used to set NUMA or processor affinnity for the thread.
//
- if (dx_server->start_handler)
- dx_server->start_handler(dx_server->start_context, thread->thread_id);
+ if (qd_server->start_handler)
+ qd_server->start_handler(qd_server->start_context, thread->thread_id);
//
// Main Loop
//
while (thread->running) {
- sys_mutex_lock(dx_server->lock);
+ sys_mutex_lock(qd_server->lock);
//
// Check for pending signals to process
//
- handle_signals_LH(dx_server);
+ handle_signals_LH(qd_server);
if (!thread->running) {
- sys_mutex_unlock(dx_server->lock);
+ sys_mutex_unlock(qd_server->lock);
break;
}
//
// Check to see if the server is pausing. If so, block here.
//
- block_if_paused_LH(dx_server);
+ block_if_paused_LH(qd_server);
if (!thread->running) {
- sys_mutex_unlock(dx_server->lock);
+ sys_mutex_unlock(qd_server->lock);
break;
}
//
// Service pending timers.
//
- dx_timer_t *timer = DEQ_HEAD(dx_server->pending_timers);
+ qd_timer_t *timer = DEQ_HEAD(qd_server->pending_timers);
if (timer) {
- DEQ_REMOVE_HEAD(dx_server->pending_timers);
+ DEQ_REMOVE_HEAD(qd_server->pending_timers);
//
// Mark the timer as idle in case it reschedules itself.
//
- dx_timer_idle_LH(timer);
+ qd_timer_idle_LH(timer);
//
// Release the lock and invoke the connection handler.
//
- sys_mutex_unlock(dx_server->lock);
+ sys_mutex_unlock(qd_server->lock);
timer->handler(timer->context);
- pn_driver_wakeup(dx_server->driver);
+ pn_driver_wakeup(qd_server->driver);
continue;
}
//
// Check the work queue for connectors scheduled for processing.
//
- work = work_queue_get(dx_server->work_queue);
+ work = work_queue_get(qd_server->work_queue);
if (!work) {
//
// There is no pending work to do
//
- if (dx_server->a_thread_is_waiting) {
+ if (qd_server->a_thread_is_waiting) {
//
// Another thread is waiting on the proton driver, this thread must
// wait on the condition variable until signaled.
//
- sys_cond_wait(dx_server->cond, dx_server->lock);
+ sys_cond_wait(qd_server->cond, qd_server->lock);
} else {
//
// This thread elects itself to wait on the proton driver. Set the
// thread-is-waiting flag so other idle threads will not interfere.
//
- dx_server->a_thread_is_waiting = true;
+ qd_server->a_thread_is_waiting = true;
//
// Ask the timer module when its next timer is scheduled to fire. We'll
// use this value in driver_wait as the timeout. If there are no scheduled
// timers, the returned value will be -1.
//
- long duration = dx_timer_next_duration_LH();
+ long duration = qd_timer_next_duration_LH();
//
// Invoke the proton driver's wait sequence. This is a bit of a hack for now
@@ -433,25 +433,25 @@ static void *thread_run(void *arg)
// the first and third of which need to be non-reentrant, and the second of which
// must be reentrant (and blocks).
//
- pn_driver_wait_1(dx_server->driver);
- sys_mutex_unlock(dx_server->lock);
+ pn_driver_wait_1(qd_server->driver);
+ sys_mutex_unlock(qd_server->lock);
do {
error = 0;
- poll_result = pn_driver_wait_2(dx_server->driver, duration);
+ poll_result = pn_driver_wait_2(qd_server->driver, duration);
if (poll_result == -1)
- error = pn_driver_errno(dx_server->driver);
+ error = pn_driver_errno(qd_server->driver);
} while (error == PN_INTR);
if (error) {
- dx_log(module, LOG_ERROR, "Driver Error: %s", pn_error_text(pn_error(dx_server->driver)));
+ qd_log(module, LOG_ERROR, "Driver Error: %s", pn_error_text(pn_error(qd_server->driver)));
exit(-1);
}
- sys_mutex_lock(dx_server->lock);
- pn_driver_wait_3(dx_server->driver);
+ sys_mutex_lock(qd_server->lock);
+ pn_driver_wait_3(qd_server->driver);
if (!thread->running) {
- sys_mutex_unlock(dx_server->lock);
+ sys_mutex_unlock(qd_server->lock);
break;
}
@@ -461,12 +461,12 @@ static void *thread_run(void *arg)
struct timespec tv;
clock_gettime(CLOCK_REALTIME, &tv);
long milliseconds = tv.tv_sec * 1000 + tv.tv_nsec / 1000000;
- dx_timer_visit_LH(milliseconds);
+ qd_timer_visit_LH(milliseconds);
//
// Process listeners (incoming connections).
//
- thread_process_listeners(dx_server);
+ thread_process_listeners(qd_server);
//
// Traverse the list of connectors-needing-service from the proton driver.
@@ -474,21 +474,21 @@ static void *thread_run(void *arg)
// being processed by another thread, put it in the work queue and signal the
// condition variable.
//
- work = pn_driver_connector(dx_server->driver);
+ work = pn_driver_connector(qd_server->driver);
while (work) {
ctx = pn_connector_context(work);
if (!ctx->enqueued && ctx->owner_thread == CONTEXT_NO_OWNER) {
ctx->enqueued = 1;
- work_queue_put(dx_server->work_queue, work);
- sys_cond_signal(dx_server->cond);
+ work_queue_put(qd_server->work_queue, work);
+ sys_cond_signal(qd_server->cond);
}
- work = pn_driver_connector(dx_server->driver);
+ work = pn_driver_connector(qd_server->driver);
}
//
// Release our exclusive claim on pn_driver_wait.
//
- dx_server->a_thread_is_waiting = false;
+ qd_server->a_thread_is_waiting = false;
}
}
@@ -501,22 +501,22 @@ static void *thread_run(void *arg)
if (ctx->owner_thread == CONTEXT_NO_OWNER) {
ctx->owner_thread = thread->thread_id;
ctx->enqueued = 0;
- dx_server->threads_active++;
+ qd_server->threads_active++;
} else {
//
// This connector is being processed by another thread, re-queue it.
//
- work_queue_put(dx_server->work_queue, work);
+ work_queue_put(qd_server->work_queue, work);
work = 0;
}
}
- sys_mutex_unlock(dx_server->lock);
+ sys_mutex_unlock(qd_server->lock);
//
// Process the connector that we now have exclusive access to.
//
if (work) {
- int work_done = process_connector(dx_server, work);
+ int work_done = process_connector(qd_server, work);
//
// Check to see if the connector was closed during processing
@@ -533,27 +533,27 @@ static void *thread_run(void *arg)
if (ctx->connector) {
ctx->connector->ctx = 0;
ctx->connector->state = CXTR_STATE_CONNECTING;
- dx_timer_schedule(ctx->connector->timer, ctx->connector->delay);
+ qd_timer_schedule(ctx->connector->timer, ctx->connector->delay);
}
- sys_mutex_lock(dx_server->lock);
- DEQ_REMOVE(dx_server->connections, ctx);
- dx_log(module, LOG_DEBUG, "removed %s connection",
+ sys_mutex_lock(qd_server->lock);
+ DEQ_REMOVE(qd_server->connections, ctx);
+ qd_log(module, LOG_DEBUG, "removed %s connection",
ctx->connector ? "connector" : "listener");
- free_dx_connection_t(ctx);
+ free_qd_connection_t(ctx);
pn_connector_free(work);
if (conn)
pn_connection_free(conn);
- dx_server->threads_active--;
- sys_mutex_unlock(dx_server->lock);
+ qd_server->threads_active--;
+ sys_mutex_unlock(qd_server->lock);
} else {
//
// The connector lives on. Mark it as no longer owned by this thread.
//
- sys_mutex_lock(dx_server->lock);
+ sys_mutex_lock(qd_server->lock);
ctx->owner_thread = CONTEXT_NO_OWNER;
- dx_server->threads_active--;
- sys_mutex_unlock(dx_server->lock);
+ qd_server->threads_active--;
+ sys_mutex_unlock(qd_server->lock);
}
//
@@ -561,7 +561,7 @@ static void *thread_run(void *arg)
// in light of the processing that just occurred.
//
if (work_done)
- pn_driver_wakeup(dx_server->driver);
+ pn_driver_wakeup(qd_server->driver);
}
}
@@ -569,7 +569,7 @@ static void *thread_run(void *arg)
}
-static void thread_start(dx_thread_t *thread)
+static void thread_start(qd_thread_t *thread)
{
if (!thread)
return;
@@ -579,7 +579,7 @@ static void thread_start(dx_thread_t *th
}
-static void thread_cancel(dx_thread_t *thread)
+static void thread_cancel(qd_thread_t *thread)
{
if (!thread)
return;
@@ -589,7 +589,7 @@ static void thread_cancel(dx_thread_t *t
}
-static void thread_join(dx_thread_t *thread)
+static void thread_join(qd_thread_t *thread)
{
if (!thread)
return;
@@ -599,7 +599,7 @@ static void thread_join(dx_thread_t *thr
}
-static void thread_free(dx_thread_t *thread)
+static void thread_free(qd_thread_t *thread)
{
if (!thread)
return;
@@ -610,11 +610,11 @@ static void thread_free(dx_thread_t *thr
static void cxtr_try_open(void *context)
{
- dx_connector_t *ct = (dx_connector_t*) context;
+ qd_connector_t *ct = (qd_connector_t*) context;
if (ct->state != CXTR_STATE_CONNECTING)
return;
- dx_connection_t *ctx = new_dx_connection_t();
+ qd_connection_t *ctx = new_qd_connection_t();
DEQ_ITEM_INIT(ctx);
ctx->server = ct->server;
ctx->state = CONN_STATE_CONNECTING;
@@ -634,218 +634,218 @@ static void cxtr_try_open(void *context)
sys_mutex_lock(ct->server->lock);
ctx->pn_cxtr = pn_connector(ct->server->driver, ct->config->host, ct->config->port, (void*) ctx);
DEQ_INSERT_TAIL(ct->server->connections, ctx);
- dx_log(module, LOG_DEBUG, "added connector connection");
+ qd_log(module, LOG_DEBUG, "added connector connection");
sys_mutex_unlock(ct->server->lock);
ct->ctx = ctx;
ct->delay = 5000;
- dx_log(module, LOG_TRACE, "Connecting to %s:%s", ct->config->host, ct->config->port);
+ qd_log(module, LOG_TRACE, "Connecting to %s:%s", ct->config->host, ct->config->port);
}
-dx_server_t *dx_server(int thread_count, const char *container_name)
+qd_server_t *qd_server(int thread_count, const char *container_name)
{
int i;
- dx_server_t *dx_server = NEW(dx_server_t);
- if (dx_server == 0)
+ qd_server_t *qd_server = NEW(qd_server_t);
+ if (qd_server == 0)
return 0;
- DEQ_INIT(dx_server->connections);
- dx_server->thread_count = thread_count;
- dx_server->container_name = container_name;
- dx_server->driver = pn_driver();
- dx_server->start_handler = 0;
- dx_server->conn_handler = 0;
- dx_server->signal_handler = 0;
- dx_server->ufd_handler = 0;
- dx_server->start_context = 0;
- dx_server->signal_context = 0;
- dx_server->lock = sys_mutex();
- dx_server->cond = sys_cond();
+ DEQ_INIT(qd_server->connections);
+ qd_server->thread_count = thread_count;
+ qd_server->container_name = container_name;
+ qd_server->driver = pn_driver();
+ qd_server->start_handler = 0;
+ qd_server->conn_handler = 0;
+ qd_server->signal_handler = 0;
+ qd_server->ufd_handler = 0;
+ qd_server->start_context = 0;
+ qd_server->signal_context = 0;
+ qd_server->lock = sys_mutex();
+ qd_server->cond = sys_cond();
- dx_timer_initialize(dx_server->lock);
+ qd_timer_initialize(qd_server->lock);
- dx_server->threads = NEW_PTR_ARRAY(dx_thread_t, thread_count);
+ qd_server->threads = NEW_PTR_ARRAY(qd_thread_t, thread_count);
for (i = 0; i < thread_count; i++)
- dx_server->threads[i] = thread(dx_server, i);
+ qd_server->threads[i] = thread(qd_server, i);
- dx_server->work_queue = work_queue();
- DEQ_INIT(dx_server->pending_timers);
- dx_server->a_thread_is_waiting = false;
- dx_server->threads_active = 0;
- dx_server->pause_requests = 0;
- dx_server->threads_paused = 0;
- dx_server->pause_next_sequence = 0;
- dx_server->pause_now_serving = 0;
- dx_server->pending_signal = 0;
+ qd_server->work_queue = work_queue();
+ DEQ_INIT(qd_server->pending_timers);
+ qd_server->a_thread_is_waiting = false;
+ qd_server->threads_active = 0;
+ qd_server->pause_requests = 0;
+ qd_server->threads_paused = 0;
+ qd_server->pause_next_sequence = 0;
+ qd_server->pause_now_serving = 0;
+ qd_server->pending_signal = 0;
- dx_log(module, LOG_INFO, "Container Name: %s", dx_server->container_name);
+ qd_log(module, LOG_INFO, "Container Name: %s", qd_server->container_name);
- return dx_server;
+ return qd_server;
}
-void dx_server_free(dx_server_t *dx_server)
+void qd_server_free(qd_server_t *qd_server)
{
int i;
- if (!dx_server)
+ if (!qd_server)
return;
- for (i = 0; i < dx_server->thread_count; i++)
- thread_free(dx_server->threads[i]);
+ for (i = 0; i < qd_server->thread_count; i++)
+ thread_free(qd_server->threads[i]);
- work_queue_free(dx_server->work_queue);
+ work_queue_free(qd_server->work_queue);
- pn_driver_free(dx_server->driver);
- sys_mutex_free(dx_server->lock);
- sys_cond_free(dx_server->cond);
- free(dx_server);
+ pn_driver_free(qd_server->driver);
+ sys_mutex_free(qd_server->lock);
+ sys_cond_free(qd_server->cond);
+ free(qd_server);
}
-void dx_server_set_conn_handler(dx_dispatch_t *dx, dx_conn_handler_cb_t handler, void *handler_context)
+void qd_server_set_conn_handler(qd_dispatch_t *qd, qd_conn_handler_cb_t handler, void *handler_context)
{
- dx->server->conn_handler = handler;
- dx->server->conn_handler_context = handler_context;
+ qd->server->conn_handler = handler;
+ qd->server->conn_handler_context = handler_context;
}
-void dx_server_set_signal_handler(dx_dispatch_t *dx, dx_signal_handler_cb_t handler, void *context)
+void qd_server_set_signal_handler(qd_dispatch_t *qd, qd_signal_handler_cb_t handler, void *context)
{
- dx->server->signal_handler = handler;
- dx->server->signal_context = context;
+ qd->server->signal_handler = handler;
+ qd->server->signal_context = context;
}
-void dx_server_set_start_handler(dx_dispatch_t *dx, dx_thread_start_cb_t handler, void *context)
+void qd_server_set_start_handler(qd_dispatch_t *qd, qd_thread_start_cb_t handler, void *context)
{
- dx->server->start_handler = handler;
- dx->server->start_context = context;
+ qd->server->start_handler = handler;
+ qd->server->start_context = context;
}
-void dx_server_set_user_fd_handler(dx_dispatch_t *dx, dx_user_fd_handler_cb_t ufd_handler)
+void qd_server_set_user_fd_handler(qd_dispatch_t *qd, qd_user_fd_handler_cb_t ufd_handler)
{
- dx->server->ufd_handler = ufd_handler;
+ qd->server->ufd_handler = ufd_handler;
}
-void dx_server_run(dx_dispatch_t *dx)
+void qd_server_run(qd_dispatch_t *qd)
{
- dx_server_t *dx_server = dx->server;
+ qd_server_t *qd_server = qd->server;
int i;
- if (!dx_server)
+ if (!qd_server)
return;
- assert(dx_server->conn_handler); // Server can't run without a connection handler.
+ assert(qd_server->conn_handler); // Server can't run without a connection handler.
- for (i = 1; i < dx_server->thread_count; i++)
- thread_start(dx_server->threads[i]);
+ for (i = 1; i < qd_server->thread_count; i++)
+ thread_start(qd_server->threads[i]);
- dx_log(module, LOG_INFO, "Operational, %d Threads Running", dx_server->thread_count);
+ qd_log(module, LOG_INFO, "Operational, %d Threads Running", qd_server->thread_count);
- thread_run((void*) dx_server->threads[0]);
+ thread_run((void*) qd_server->threads[0]);
- for (i = 1; i < dx_server->thread_count; i++)
- thread_join(dx_server->threads[i]);
+ for (i = 1; i < qd_server->thread_count; i++)
+ thread_join(qd_server->threads[i]);
- dx_log(module, LOG_INFO, "Shut Down");
+ qd_log(module, LOG_INFO, "Shut Down");
}
-void dx_server_start(dx_dispatch_t *dx)
+void qd_server_start(qd_dispatch_t *qd)
{
- dx_server_t *dx_server = dx->server;
+ qd_server_t *qd_server = qd->server;
int i;
- if (!dx_server)
+ if (!qd_server)
return;
- assert(dx_server->conn_handler); // Server can't run without a connection handler.
+ assert(qd_server->conn_handler); // Server can't run without a connection handler.
- for (i = 0; i < dx_server->thread_count; i++)
- thread_start(dx_server->threads[i]);
+ for (i = 0; i < qd_server->thread_count; i++)
+ thread_start(qd_server->threads[i]);
- dx_log(module, LOG_INFO, "Operational, %d Threads Running", dx_server->thread_count);
+ qd_log(module, LOG_INFO, "Operational, %d Threads Running", qd_server->thread_count);
}
-void dx_server_stop(dx_dispatch_t *dx)
+void qd_server_stop(qd_dispatch_t *qd)
{
- dx_server_t *dx_server = dx->server;
+ qd_server_t *qd_server = qd->server;
int idx;
- sys_mutex_lock(dx_server->lock);
- for (idx = 0; idx < dx_server->thread_count; idx++)
- thread_cancel(dx_server->threads[idx]);
- sys_cond_signal_all(dx_server->cond);
- pn_driver_wakeup(dx_server->driver);
- sys_mutex_unlock(dx_server->lock);
-
- if (thread_server != dx_server) {
- for (idx = 0; idx < dx_server->thread_count; idx++)
- thread_join(dx_server->threads[idx]);
- dx_log(module, LOG_INFO, "Shut Down");
+ sys_mutex_lock(qd_server->lock);
+ for (idx = 0; idx < qd_server->thread_count; idx++)
+ thread_cancel(qd_server->threads[idx]);
+ sys_cond_signal_all(qd_server->cond);
+ pn_driver_wakeup(qd_server->driver);
+ sys_mutex_unlock(qd_server->lock);
+
+ if (thread_server != qd_server) {
+ for (idx = 0; idx < qd_server->thread_count; idx++)
+ thread_join(qd_server->threads[idx]);
+ qd_log(module, LOG_INFO, "Shut Down");
}
}
-void dx_server_signal(dx_dispatch_t *dx, int signum)
+void qd_server_signal(qd_dispatch_t *qd, int signum)
{
- dx_server_t *dx_server = dx->server;
+ qd_server_t *qd_server = qd->server;
- dx_server->pending_signal = signum;
- sys_cond_signal_all(dx_server->cond);
- pn_driver_wakeup(dx_server->driver);
+ qd_server->pending_signal = signum;
+ sys_cond_signal_all(qd_server->cond);
+ pn_driver_wakeup(qd_server->driver);
}
-void dx_server_pause(dx_dispatch_t *dx)
+void qd_server_pause(qd_dispatch_t *qd)
{
- dx_server_t *dx_server = dx->server;
+ qd_server_t *qd_server = qd->server;
- sys_mutex_lock(dx_server->lock);
+ sys_mutex_lock(qd_server->lock);
//
// Bump the request count to stop all the threads.
//
- dx_server->pause_requests++;
- int my_sequence = dx_server->pause_next_sequence++;
+ qd_server->pause_requests++;
+ int my_sequence = qd_server->pause_next_sequence++;
//
// Awaken all threads that are currently blocking.
//
- sys_cond_signal_all(dx_server->cond);
- pn_driver_wakeup(dx_server->driver);
+ sys_cond_signal_all(qd_server->cond);
+ pn_driver_wakeup(qd_server->driver);
//
// Wait for the paused thread count plus the number of threads requesting a pause to equal
// the total thread count. Also, don't exit the blocking loop until now_serving equals our
// sequence number. This ensures that concurrent pausers don't run at the same time.
//
- while ((dx_server->threads_paused + dx_server->pause_requests < dx_server->thread_count) ||
- (my_sequence != dx_server->pause_now_serving))
- sys_cond_wait(dx_server->cond, dx_server->lock);
+ while ((qd_server->threads_paused + qd_server->pause_requests < qd_server->thread_count) ||
+ (my_sequence != qd_server->pause_now_serving))
+ sys_cond_wait(qd_server->cond, qd_server->lock);
- sys_mutex_unlock(dx_server->lock);
+ sys_mutex_unlock(qd_server->lock);
}
-void dx_server_resume(dx_dispatch_t *dx)
+void qd_server_resume(qd_dispatch_t *qd)
{
- dx_server_t *dx_server = dx->server;
+ qd_server_t *qd_server = qd->server;
- sys_mutex_lock(dx_server->lock);
- dx_server->pause_requests--;
- dx_server->pause_now_serving++;
- sys_cond_signal_all(dx_server->cond);
- sys_mutex_unlock(dx_server->lock);
+ sys_mutex_lock(qd_server->lock);
+ qd_server->pause_requests--;
+ qd_server->pause_now_serving++;
+ sys_cond_signal_all(qd_server->cond);
+ sys_mutex_unlock(qd_server->lock);
}
-void dx_server_activate(dx_connection_t *ctx)
+void qd_server_activate(qd_connection_t *ctx)
{
if (!ctx)
return;
@@ -859,37 +859,37 @@ void dx_server_activate(dx_connection_t
}
-void dx_connection_set_context(dx_connection_t *conn, void *context)
+void qd_connection_set_context(qd_connection_t *conn, void *context)
{
conn->user_context = context;
}
-void *dx_connection_get_context(dx_connection_t *conn)
+void *qd_connection_get_context(qd_connection_t *conn)
{
return conn->user_context;
}
-void dx_connection_set_link_context(dx_connection_t *conn, void *context)
+void qd_connection_set_link_context(qd_connection_t *conn, void *context)
{
conn->link_context = context;
}
-void *dx_connection_get_link_context(dx_connection_t *conn)
+void *qd_connection_get_link_context(qd_connection_t *conn)
{
return conn->link_context;
}
-pn_connection_t *dx_connection_pn(dx_connection_t *conn)
+pn_connection_t *qd_connection_pn(qd_connection_t *conn)
{
return conn->pn_conn;
}
-const dx_server_config_t *dx_connection_config(const dx_connection_t *conn)
+const qd_server_config_t *qd_connection_config(const qd_connection_t *conn)
{
if (conn->listener)
return conn->listener->config;
@@ -897,66 +897,66 @@ const dx_server_config_t *dx_connection_
}
-dx_listener_t *dx_server_listen(dx_dispatch_t *dx, const dx_server_config_t *config, void *context)
+qd_listener_t *qd_server_listen(qd_dispatch_t *qd, const qd_server_config_t *config, void *context)
{
- dx_server_t *dx_server = dx->server;
- dx_listener_t *li = new_dx_listener_t();
+ qd_server_t *qd_server = qd->server;
+ qd_listener_t *li = new_qd_listener_t();
if (!li)
return 0;
- li->server = dx_server;
+ li->server = qd_server;
li->config = config;
li->context = context;
- li->pn_listener = pn_listener(dx_server->driver, config->host, config->port, (void*) li);
+ li->pn_listener = pn_listener(qd_server->driver, config->host, config->port, (void*) li);
if (!li->pn_listener) {
- dx_log(module, LOG_ERROR, "Driver Error %d (%s)",
- pn_driver_errno(dx_server->driver), pn_driver_error(dx_server->driver));
- free_dx_listener_t(li);
+ qd_log(module, LOG_ERROR, "Driver Error %d (%s)",
+ pn_driver_errno(qd_server->driver), pn_driver_error(qd_server->driver));
+ free_qd_listener_t(li);
return 0;
}
- dx_log(module, LOG_TRACE, "Listening on %s:%s", config->host, config->port);
+ qd_log(module, LOG_TRACE, "Listening on %s:%s", config->host, config->port);
return li;
}
-void dx_server_listener_free(dx_listener_t* li)
+void qd_server_listener_free(qd_listener_t* li)
{
pn_listener_free(li->pn_listener);
- free_dx_listener_t(li);
+ free_qd_listener_t(li);
}
-void dx_server_listener_close(dx_listener_t* li)
+void qd_server_listener_close(qd_listener_t* li)
{
pn_listener_close(li->pn_listener);
}
-dx_connector_t *dx_server_connect(dx_dispatch_t *dx, const dx_server_config_t *config, void *context)
+qd_connector_t *qd_server_connect(qd_dispatch_t *qd, const qd_server_config_t *config, void *context)
{
- dx_server_t *dx_server = dx->server;
- dx_connector_t *ct = new_dx_connector_t();
+ qd_server_t *qd_server = qd->server;
+ qd_connector_t *ct = new_qd_connector_t();
if (!ct)
return 0;
- ct->server = dx_server;
+ ct->server = qd_server;
ct->state = CXTR_STATE_CONNECTING;
ct->config = config;
ct->context = context;
ct->ctx = 0;
- ct->timer = dx_timer(dx, cxtr_try_open, (void*) ct);
+ ct->timer = qd_timer(qd, cxtr_try_open, (void*) ct);
ct->delay = 0;
- dx_timer_schedule(ct->timer, ct->delay);
+ qd_timer_schedule(ct->timer, ct->delay);
return ct;
}
-void dx_server_connector_free(dx_connector_t* ct)
+void qd_server_connector_free(qd_connector_t* ct)
{
// Don't free the proton connector. This will be done by the connector
// processing/cleanup.
@@ -966,22 +966,22 @@ void dx_server_connector_free(dx_connect
ct->ctx->connector = 0;
}
- dx_timer_free(ct->timer);
- free_dx_connector_t(ct);
+ qd_timer_free(ct->timer);
+ free_qd_connector_t(ct);
}
-dx_user_fd_t *dx_user_fd(dx_dispatch_t *dx, int fd, void *context)
+qd_user_fd_t *qd_user_fd(qd_dispatch_t *qd, int fd, void *context)
{
- dx_server_t *dx_server = dx->server;
- dx_user_fd_t *ufd = new_dx_user_fd_t();
+ qd_server_t *qd_server = qd->server;
+ qd_user_fd_t *ufd = new_qd_user_fd_t();
if (!ufd)
return 0;
- dx_connection_t *ctx = new_dx_connection_t();
+ qd_connection_t *ctx = new_qd_connection_t();
DEQ_ITEM_INIT(ctx);
- ctx->server = dx_server;
+ ctx->server = qd_server;
ctx->state = CONN_STATE_USER;
ctx->owner_thread = CONTEXT_NO_OWNER;
ctx->enqueued = 0;
@@ -994,55 +994,55 @@ dx_user_fd_t *dx_user_fd(dx_dispatch_t *
ctx->ufd = ufd;
ufd->context = context;
- ufd->server = dx_server;
+ ufd->server = qd_server;
ufd->fd = fd;
- ufd->pn_conn = pn_connector_fd(dx_server->driver, fd, (void*) ctx);
- pn_driver_wakeup(dx_server->driver);
+ ufd->pn_conn = pn_connector_fd(qd_server->driver, fd, (void*) ctx);
+ pn_driver_wakeup(qd_server->driver);
return ufd;
}
-void dx_user_fd_free(dx_user_fd_t *ufd)
+void qd_user_fd_free(qd_user_fd_t *ufd)
{
pn_connector_close(ufd->pn_conn);
- free_dx_user_fd_t(ufd);
+ free_qd_user_fd_t(ufd);
}
-void dx_user_fd_activate_read(dx_user_fd_t *ufd)
+void qd_user_fd_activate_read(qd_user_fd_t *ufd)
{
pn_connector_activate(ufd->pn_conn, PN_CONNECTOR_READABLE);
pn_driver_wakeup(ufd->server->driver);
}
-void dx_user_fd_activate_write(dx_user_fd_t *ufd)
+void qd_user_fd_activate_write(qd_user_fd_t *ufd)
{
pn_connector_activate(ufd->pn_conn, PN_CONNECTOR_WRITABLE);
pn_driver_wakeup(ufd->server->driver);
}
-bool dx_user_fd_is_readable(dx_user_fd_t *ufd)
+bool qd_user_fd_is_readable(qd_user_fd_t *ufd)
{
return pn_connector_activated(ufd->pn_conn, PN_CONNECTOR_READABLE);
}
-bool dx_user_fd_is_writeable(dx_user_fd_t *ufd)
+bool qd_user_fd_is_writeable(qd_user_fd_t *ufd)
{
return pn_connector_activated(ufd->pn_conn, PN_CONNECTOR_WRITABLE);
}
-void dx_server_timer_pending_LH(dx_timer_t *timer)
+void qd_server_timer_pending_LH(qd_timer_t *timer)
{
DEQ_INSERT_TAIL(timer->server->pending_timers, timer);
}
-void dx_server_timer_cancel_LH(dx_timer_t *timer)
+void qd_server_timer_cancel_LH(qd_timer_t *timer)
{
DEQ_REMOVE(timer->server->pending_timers, timer);
}
@@ -1055,14 +1055,14 @@ static void server_schema_handler(void *
static void server_query_handler(void* context, const char *id, void *cor)
{
- dx_server_t *dx_server = (dx_server_t*) context;
- sys_mutex_lock(dx_server->lock);
+ qd_server_t *qd_server = (qd_server_t*) context;
+ sys_mutex_lock(qd_server->lock);
const char *conn_state;
- const dx_server_config_t *config;
+ const qd_server_config_t *config;
const char *pn_container_name;
const char *direction;
- dx_connection_t *conn = DEQ_HEAD(dx_server->connections);
+ qd_connection_t *conn = DEQ_HEAD(qd_server->connections);
while (conn) {
switch (conn->state) {
case CONN_STATE_CONNECTING: conn_state = "Connecting"; break;
@@ -1072,13 +1072,13 @@ static void server_query_handler(void* c
case CONN_STATE_USER: conn_state = "User"; break;
default: conn_state = "undefined"; break;
}
- dx_agent_value_string(cor, "state", conn_state);
+ qd_agent_value_string(cor, "state", conn_state);
// get remote container name using proton connection
pn_container_name = pn_connection_remote_container(conn->pn_conn);
if (pn_container_name)
- dx_agent_value_string(cor, "container", pn_container_name);
+ qd_agent_value_string(cor, "container", pn_container_name);
else
- dx_agent_value_null(cor, "container");
+ qd_agent_value_null(cor, "container");
// and now for some config entries
if (conn->connector) {
@@ -1088,25 +1088,25 @@ static void server_query_handler(void* c
strcpy(host, config->host);
strcat(host, ":");
strcat(host, config->port);
- dx_agent_value_string(cor, "host", host);
+ qd_agent_value_string(cor, "host", host);
} else {
config = conn->listener->config;
direction = "in";
- dx_agent_value_string(cor, "host", pn_connector_name(conn->pn_cxtr));
+ qd_agent_value_string(cor, "host", pn_connector_name(conn->pn_cxtr));
}
- dx_agent_value_string(cor, "sasl", config->sasl_mechanisms);
- dx_agent_value_string(cor, "role", config->role);
- dx_agent_value_string(cor, "dir", direction);
+ qd_agent_value_string(cor, "sasl", config->sasl_mechanisms);
+ qd_agent_value_string(cor, "role", config->role);
+ qd_agent_value_string(cor, "dir", direction);
conn = DEQ_NEXT(conn);
- dx_agent_value_complete(cor, conn != 0);
+ qd_agent_value_complete(cor, conn != 0);
}
- sys_mutex_unlock(dx_server->lock);
+ sys_mutex_unlock(qd_server->lock);
}
-void dx_server_setup_agent(dx_dispatch_t *dx)
+void qd_server_setup_agent(qd_dispatch_t *qd)
{
- dx_agent_register_class(dx, "org.apache.qpid.dispatch.connection", dx->server, server_schema_handler, server_query_handler);
+ qd_agent_register_class(qd, "org.apache.qpid.dispatch.connection", qd->server, server_schema_handler, server_query_handler);
}
Modified: qpid/dispatch/trunk/src/server_private.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/server_private.h?rev=1543534&r1=1543533&r2=1543534&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/server_private.h (original)
+++ qpid/dispatch/trunk/src/server_private.h Tue Nov 19 19:21:30 2013
@@ -28,8 +28,8 @@
#include <proton/engine.h>
#include <proton/driver_extras.h>
-void dx_server_timer_pending_LH(dx_timer_t *timer);
-void dx_server_timer_cancel_LH(dx_timer_t *timer);
+void qd_server_timer_pending_LH(qd_timer_t *timer);
+void qd_server_timer_cancel_LH(qd_timer_t *timer);
typedef enum {
@@ -48,57 +48,57 @@ typedef enum {
CXTR_STATE_FAILED
} cxtr_state_t;
-typedef struct dx_server_t dx_server_t;
+typedef struct qd_server_t qd_server_t;
-struct dx_listener_t {
- dx_server_t *server;
- const dx_server_config_t *config;
+struct qd_listener_t {
+ qd_server_t *server;
+ const qd_server_config_t *config;
void *context;
pn_listener_t *pn_listener;
};
-struct dx_connector_t {
- dx_server_t *server;
+struct qd_connector_t {
+ qd_server_t *server;
cxtr_state_t state;
- const dx_server_config_t *config;
+ const qd_server_config_t *config;
void *context;
- dx_connection_t *ctx;
- dx_timer_t *timer;
+ qd_connection_t *ctx;
+ qd_timer_t *timer;
long delay;
};
-struct dx_connection_t {
- DEQ_LINKS(dx_connection_t);
- dx_server_t *server;
+struct qd_connection_t {
+ DEQ_LINKS(qd_connection_t);
+ qd_server_t *server;
conn_state_t state;
int owner_thread;
int enqueued;
pn_connector_t *pn_cxtr;
pn_connection_t *pn_conn;
- dx_listener_t *listener;
- dx_connector_t *connector;
+ qd_listener_t *listener;
+ qd_connector_t *connector;
void *context; // Copy of context from listener or connector
void *user_context;
void *link_context; // Context shared by this connection's links
- dx_user_fd_t *ufd;
+ qd_user_fd_t *ufd;
};
-struct dx_user_fd_t {
- dx_server_t *server;
+struct qd_user_fd_t {
+ qd_server_t *server;
void *context;
int fd;
pn_connector_t *pn_conn;
};
-ALLOC_DECLARE(dx_listener_t);
-ALLOC_DECLARE(dx_connector_t);
-ALLOC_DECLARE(dx_connection_t);
-ALLOC_DECLARE(dx_user_fd_t);
+ALLOC_DECLARE(qd_listener_t);
+ALLOC_DECLARE(qd_connector_t);
+ALLOC_DECLARE(qd_connection_t);
+ALLOC_DECLARE(qd_user_fd_t);
-DEQ_DECLARE(dx_connection_t, dx_connection_list_t);
+DEQ_DECLARE(qd_connection_t, qd_connection_list_t);
#endif
Modified: qpid/dispatch/trunk/src/timer.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/timer.c?rev=1543534&r1=1543533&r2=1543534&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/timer.c (original)
+++ qpid/dispatch/trunk/src/timer.c Tue Nov 19 19:21:30 2013
@@ -27,18 +27,18 @@
#include <stdio.h>
static sys_mutex_t *lock;
-static dx_timer_list_t idle_timers;
-static dx_timer_list_t scheduled_timers;
+static qd_timer_list_t idle_timers;
+static qd_timer_list_t scheduled_timers;
static long time_base;
-ALLOC_DECLARE(dx_timer_t);
-ALLOC_DEFINE(dx_timer_t);
+ALLOC_DECLARE(qd_timer_t);
+ALLOC_DEFINE(qd_timer_t);
//=========================================================================
// Private static functions
//=========================================================================
-static void dx_timer_cancel_LH(dx_timer_t *timer)
+static void qd_timer_cancel_LH(qd_timer_t *timer)
{
switch (timer->state) {
case TIMER_FREE:
@@ -56,7 +56,7 @@ static void dx_timer_cancel_LH(dx_timer_
break;
case TIMER_PENDING:
- dx_server_timer_cancel_LH(timer);
+ qd_server_timer_cancel_LH(timer);
DEQ_INSERT_TAIL(idle_timers, timer);
break;
}
@@ -69,15 +69,15 @@ static void dx_timer_cancel_LH(dx_timer_
// Public Functions from timer.h
//=========================================================================
-dx_timer_t *dx_timer(dx_dispatch_t *dx, dx_timer_cb_t cb, void* context)
+qd_timer_t *qd_timer(qd_dispatch_t *qd, qd_timer_cb_t cb, void* context)
{
- dx_timer_t *timer = new_dx_timer_t();
+ qd_timer_t *timer = new_qd_timer_t();
if (!timer)
return 0;
DEQ_ITEM_INIT(timer);
- timer->server = dx ? dx->server : 0;
+ timer->server = qd ? qd->server : 0;
timer->handler = cb;
timer->context = context;
timer->delta_time = 0;
@@ -91,26 +91,26 @@ dx_timer_t *dx_timer(dx_dispatch_t *dx,
}
-void dx_timer_free(dx_timer_t *timer)
+void qd_timer_free(qd_timer_t *timer)
{
sys_mutex_lock(lock);
- dx_timer_cancel_LH(timer);
+ qd_timer_cancel_LH(timer);
DEQ_REMOVE(idle_timers, timer);
sys_mutex_unlock(lock);
timer->state = TIMER_FREE;
- free_dx_timer_t(timer);
+ free_qd_timer_t(timer);
}
-void dx_timer_schedule(dx_timer_t *timer, long duration)
+void qd_timer_schedule(qd_timer_t *timer, long duration)
{
- dx_timer_t *ptr;
- dx_timer_t *last;
+ qd_timer_t *ptr;
+ qd_timer_t *last;
long total_time;
sys_mutex_lock(lock);
- dx_timer_cancel_LH(timer); // Timer is now on the idle list
+ qd_timer_cancel_LH(timer); // Timer is now on the idle list
assert(timer->state == TIMER_IDLE);
DEQ_REMOVE(idle_timers, timer);
@@ -121,7 +121,7 @@ void dx_timer_schedule(dx_timer_t *timer
//
if (duration == 0) {
timer->state = TIMER_PENDING;
- dx_server_timer_pending_LH(timer);
+ qd_server_timer_pending_LH(timer);
sys_mutex_unlock(lock);
return;
}
@@ -165,10 +165,10 @@ void dx_timer_schedule(dx_timer_t *timer
}
-void dx_timer_cancel(dx_timer_t *timer)
+void qd_timer_cancel(qd_timer_t *timer)
{
sys_mutex_lock(lock);
- dx_timer_cancel_LH(timer);
+ qd_timer_cancel_LH(timer);
sys_mutex_unlock(lock);
}
@@ -177,7 +177,7 @@ void dx_timer_cancel(dx_timer_t *timer)
// Private Functions from timer_private.h
//=========================================================================
-void dx_timer_initialize(sys_mutex_t *server_lock)
+void qd_timer_initialize(sys_mutex_t *server_lock)
{
lock = server_lock;
DEQ_INIT(idle_timers);
@@ -186,25 +186,25 @@ void dx_timer_initialize(sys_mutex_t *se
}
-void dx_timer_finalize(void)
+void qd_timer_finalize(void)
{
lock = 0;
}
-long dx_timer_next_duration_LH(void)
+long qd_timer_next_duration_LH(void)
{
- dx_timer_t *timer = DEQ_HEAD(scheduled_timers);
+ qd_timer_t *timer = DEQ_HEAD(scheduled_timers);
if (timer)
return timer->delta_time;
return -1;
}
-void dx_timer_visit_LH(long current_time)
+void qd_timer_visit_LH(long current_time)
{
long delta;
- dx_timer_t *timer = DEQ_HEAD(scheduled_timers);
+ qd_timer_t *timer = DEQ_HEAD(scheduled_timers);
if (time_base == 0) {
time_base = current_time;
@@ -223,7 +223,7 @@ void dx_timer_visit_LH(long current_time
DEQ_REMOVE_HEAD(scheduled_timers);
delta -= timer->delta_time;
timer->state = TIMER_PENDING;
- dx_server_timer_pending_LH(timer);
+ qd_server_timer_pending_LH(timer);
}
timer = DEQ_HEAD(scheduled_timers);
@@ -231,7 +231,7 @@ void dx_timer_visit_LH(long current_time
}
-void dx_timer_idle_LH(dx_timer_t *timer)
+void qd_timer_idle_LH(qd_timer_t *timer)
{
timer->state = TIMER_IDLE;
DEQ_INSERT_TAIL(idle_timers, timer);
Modified: qpid/dispatch/trunk/src/timer_private.h
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/src/timer_private.h?rev=1543534&r1=1543533&r2=1543534&view=diff
==============================================================================
--- qpid/dispatch/trunk/src/timer_private.h (original)
+++ qpid/dispatch/trunk/src/timer_private.h Tue Nov 19 19:21:30 2013
@@ -29,25 +29,25 @@ typedef enum {
TIMER_IDLE,
TIMER_SCHEDULED,
TIMER_PENDING
-} dx_timer_state_t;
+} qd_timer_state_t;
-struct dx_timer_t {
- DEQ_LINKS(dx_timer_t);
- dx_server_t *server;
- dx_timer_cb_t handler;
+struct qd_timer_t {
+ DEQ_LINKS(qd_timer_t);
+ qd_server_t *server;
+ qd_timer_cb_t handler;
void *context;
long delta_time;
- dx_timer_state_t state;
+ qd_timer_state_t state;
};
-DEQ_DECLARE(dx_timer_t, dx_timer_list_t);
+DEQ_DECLARE(qd_timer_t, qd_timer_list_t);
-void dx_timer_initialize(sys_mutex_t *server_lock);
-void dx_timer_finalize(void);
-long dx_timer_next_duration_LH(void);
-void dx_timer_visit_LH(long current_time);
-void dx_timer_idle_LH(dx_timer_t *timer);
+void qd_timer_initialize(sys_mutex_t *server_lock);
+void qd_timer_finalize(void);
+long qd_timer_next_duration_LH(void);
+void qd_timer_visit_LH(long current_time);
+void qd_timer_idle_LH(qd_timer_t *timer);
#endif
Modified: qpid/dispatch/trunk/tests/alloc_test.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/alloc_test.c?rev=1543534&r1=1543533&r2=1543534&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/alloc_test.c (original)
+++ qpid/dispatch/trunk/tests/alloc_test.c Tue Nov 19 19:21:30 2013
@@ -27,13 +27,13 @@ typedef struct {
int B;
} object_t;
-dx_alloc_config_t config = {3, 7, 10};
+qd_alloc_config_t config = {3, 7, 10};
ALLOC_DECLARE(object_t);
ALLOC_DEFINE_CONFIG(object_t, sizeof(object_t), 0, &config);
-static char* check_stats(dx_alloc_stats_t *stats, uint64_t ah, uint64_t fh, uint64_t ht, uint64_t rt, uint64_t rg)
+static char* check_stats(qd_alloc_stats_t *stats, uint64_t ah, uint64_t fh, uint64_t ht, uint64_t rt, uint64_t rg)
{
if (stats->total_alloc_from_heap != ah) return "Incorrect alloc-from-heap";
if (stats->total_free_to_heap != fh) return "Incorrect free-to-heap";
@@ -48,7 +48,7 @@ static char* test_alloc_basic(void *cont
{
object_t *obj[50];
int idx;
- dx_alloc_stats_t *stats;
+ qd_alloc_stats_t *stats;
char *error;
for (idx = 0; idx < 20; idx++)
@@ -77,7 +77,7 @@ static char* test_alloc_basic(void *cont
int alloc_tests(void)
{
int result = 0;
- dx_alloc_initialize();
+ qd_alloc_initialize();
TEST_CASE(test_alloc_basic, 0);
Modified: qpid/dispatch/trunk/tests/compose_test.c
URL: http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/compose_test.c?rev=1543534&r1=1543533&r2=1543534&view=diff
==============================================================================
--- qpid/dispatch/trunk/tests/compose_test.c (original)
+++ qpid/dispatch/trunk/tests/compose_test.c Tue Nov 19 19:21:30 2013
@@ -86,34 +86,34 @@ static int vector0_length = 302;
static char *test_compose_list_of_maps(void *context)
{
- dx_composed_field_t *field = dx_compose(DX_PERFORMATIVE_BODY_AMQP_VALUE, 0);
+ qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0);
- dx_compose_start_list(field);
+ qd_compose_start_list(field);
- dx_compose_start_map(field);
- dx_compose_insert_string(field, "key001");
- dx_compose_insert_uint(field, 10);
- dx_compose_insert_string(field, "key002");
- dx_compose_insert_uint(field, 11);
- dx_compose_end_map(field);
+ qd_compose_start_map(field);
+ qd_compose_insert_string(field, "key001");
+ qd_compose_insert_uint(field, 10);
+ qd_compose_insert_string(field, "key002");
+ qd_compose_insert_uint(field, 11);
+ qd_compose_end_map(field);
for (int j = 0; j < 9; j++) {
- dx_compose_start_map(field);
- dx_compose_insert_string(field, "key001");
- dx_compose_insert_uint(field, 20);
- dx_compose_insert_string(field, "key002");
- dx_compose_insert_uint(field, 21);
- dx_compose_end_map(field);
+ qd_compose_start_map(field);
+ qd_compose_insert_string(field, "key001");
+ qd_compose_insert_uint(field, 20);
+ qd_compose_insert_string(field, "key002");
+ qd_compose_insert_uint(field, 21);
+ qd_compose_end_map(field);
}
- dx_compose_end_list(field);
+ qd_compose_end_list(field);
- dx_buffer_t *buf = DEQ_HEAD(field->buffers);
+ qd_buffer_t *buf = DEQ_HEAD(field->buffers);
- if (dx_buffer_size(buf) != vector0_length) return "Incorrect Length of Buffer";
+ if (qd_buffer_size(buf) != vector0_length) return "Incorrect Length of Buffer";
char *left = vector0;
- char *right = (char*) dx_buffer_base(buf);
+ char *right = (char*) qd_buffer_base(buf);
int idx;
for (idx = 0; idx < vector0_length; idx++) {
@@ -122,7 +122,7 @@ static char *test_compose_list_of_maps(v
right++;
}
- dx_compose_free(field);
+ qd_compose_free(field);
return 0;
}
@@ -143,32 +143,32 @@ static int vector1_length = 69;
static char *test_compose_nested_composites(void *context)
{
- dx_composed_field_t *field = dx_compose(DX_PERFORMATIVE_DELIVERY_ANNOTATIONS, 0);
+ qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_DELIVERY_ANNOTATIONS, 0);
- dx_compose_start_map(field);
+ qd_compose_start_map(field);
- dx_compose_insert_string(field, "key001");
- dx_compose_insert_uint(field, 10);
+ qd_compose_insert_string(field, "key001");
+ qd_compose_insert_uint(field, 10);
- dx_compose_insert_string(field, "key002");
- dx_compose_start_list(field);
+ qd_compose_insert_string(field, "key002");
+ qd_compose_start_list(field);
- dx_compose_insert_string(field, "item1");
- dx_compose_insert_string(field, "item2");
- dx_compose_insert_string(field, "item3");
+ qd_compose_insert_string(field, "item1");
+ qd_compose_insert_string(field, "item2");
+ qd_compose_insert_string(field, "item3");
- dx_compose_start_list(field);
- dx_compose_end_list(field);
+ qd_compose_start_list(field);
+ qd_compose_end_list(field);
- dx_compose_end_list(field);
- dx_compose_end_map(field);
+ qd_compose_end_list(field);
+ qd_compose_end_map(field);
- dx_buffer_t *buf = DEQ_HEAD(field->buffers);
+ qd_buffer_t *buf = DEQ_HEAD(field->buffers);
- if (dx_buffer_size(buf) != vector1_length) return "Incorrect Length of Buffer";
+ if (qd_buffer_size(buf) != vector1_length) return "Incorrect Length of Buffer";
char *left = vector1;
- char *right = (char*) dx_buffer_base(buf);
+ char *right = (char*) qd_buffer_base(buf);
int idx;
for (idx = 0; idx < vector1_length; idx++) {
@@ -177,7 +177,7 @@ static char *test_compose_nested_composi
right++;
}
- dx_compose_free(field);
+ qd_compose_free(field);
return 0;
}
@@ -218,53 +218,53 @@ static int vector2_length = 139;
static char *test_compose_scalars(void *context)
{
- dx_composed_field_t *field = dx_compose(DX_PERFORMATIVE_PROPERTIES, 0);
+ qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_PROPERTIES, 0);
- dx_compose_start_list(field);
+ qd_compose_start_list(field);
- dx_compose_insert_null(field);
+ qd_compose_insert_null(field);
- dx_compose_insert_bool(field, 0);
- dx_compose_insert_bool(field, 1);
+ qd_compose_insert_bool(field, 0);
+ qd_compose_insert_bool(field, 1);
- dx_compose_insert_uint(field, 0);
- dx_compose_insert_uint(field, 1);
- dx_compose_insert_uint(field, 255);
- dx_compose_insert_uint(field, 256);
- dx_compose_insert_uint(field, 0x10000000);
-
- dx_compose_insert_ulong(field, 0);
- dx_compose_insert_ulong(field, 1);
- dx_compose_insert_ulong(field, 255);
- dx_compose_insert_ulong(field, 256);
- dx_compose_insert_ulong(field, 0x20000000);
-
- dx_compose_insert_int(field, 0);
- dx_compose_insert_int(field, 1);
- dx_compose_insert_int(field, -1);
- dx_compose_insert_int(field, 255);
- dx_compose_insert_int(field, 256);
-
- dx_compose_insert_long(field, 0);
- dx_compose_insert_long(field, 1);
- dx_compose_insert_long(field, -1);
- dx_compose_insert_long(field, 255);
- dx_compose_insert_long(field, 256);
-
- dx_compose_insert_timestamp(field, 0x0011223344556677);
- dx_compose_insert_uuid(field, (uint8_t*) "\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x00");
- dx_compose_insert_binary(field, (uint8_t*) "\x00\x11", 2);
- dx_compose_insert_string(field, "string");
- dx_compose_insert_symbol(field, "symbol");
+ qd_compose_insert_uint(field, 0);
+ qd_compose_insert_uint(field, 1);
+ qd_compose_insert_uint(field, 255);
+ qd_compose_insert_uint(field, 256);
+ qd_compose_insert_uint(field, 0x10000000);
+
+ qd_compose_insert_ulong(field, 0);
+ qd_compose_insert_ulong(field, 1);
+ qd_compose_insert_ulong(field, 255);
+ qd_compose_insert_ulong(field, 256);
+ qd_compose_insert_ulong(field, 0x20000000);
+
+ qd_compose_insert_int(field, 0);
+ qd_compose_insert_int(field, 1);
+ qd_compose_insert_int(field, -1);
+ qd_compose_insert_int(field, 255);
+ qd_compose_insert_int(field, 256);
+
+ qd_compose_insert_long(field, 0);
+ qd_compose_insert_long(field, 1);
+ qd_compose_insert_long(field, -1);
+ qd_compose_insert_long(field, 255);
+ qd_compose_insert_long(field, 256);
+
+ qd_compose_insert_timestamp(field, 0x0011223344556677);
+ qd_compose_insert_uuid(field, (uint8_t*) "\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x00");
+ qd_compose_insert_binary(field, (uint8_t*) "\x00\x11", 2);
+ qd_compose_insert_string(field, "string");
+ qd_compose_insert_symbol(field, "symbol");
- dx_compose_end_list(field);
+ qd_compose_end_list(field);
- dx_buffer_t *buf = DEQ_HEAD(field->buffers);
+ qd_buffer_t *buf = DEQ_HEAD(field->buffers);
- if (dx_buffer_size(buf) != vector2_length) return "Incorrect Length of Buffer";
+ if (qd_buffer_size(buf) != vector2_length) return "Incorrect Length of Buffer";
char *left = vector2;
- char *right = (char*) dx_buffer_base(buf);
+ char *right = (char*) qd_buffer_base(buf);
int idx;
for (idx = 0; idx < vector2_length; idx++) {
@@ -273,7 +273,7 @@ static char *test_compose_scalars(void *
right++;
}
- dx_compose_free(field);
+ qd_compose_free(field);
return 0;
}
@@ -281,7 +281,7 @@ static char *test_compose_scalars(void *
int compose_tests()
{
int result = 0;
- dx_log_set_mask(LOG_NONE);
+ qd_log_set_mask(LOG_NONE);
TEST_CASE(test_compose_list_of_maps, 0);
TEST_CASE(test_compose_nested_composites, 0);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org