You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2012/09/19 04:31:17 UTC
svn commit: r1387437 - in /qpid/proton/branches/driver_abstraction/proton-c:
bindings/python/ include/proton/ src/ src/engine/ src/message/ src/pollers/
Author: kgiusti
Date: Wed Sep 19 02:31:16 2012
New Revision: 1387437
URL: http://svn.apache.org/viewvc?rev=1387437&view=rev
Log:
PROTON-2: merge latest trunk into branch
Added:
qpid/proton/branches/driver_abstraction/proton-c/bindings/python/proton.py
- copied unchanged from r1387433, qpid/proton/trunk/proton-c/bindings/python/proton.py
Modified:
qpid/proton/branches/driver_abstraction/proton-c/bindings/python/CMakeLists.txt
qpid/proton/branches/driver_abstraction/proton-c/include/proton/engine.h
qpid/proton/branches/driver_abstraction/proton-c/src/driver-internal.h
qpid/proton/branches/driver_abstraction/proton-c/src/driver.c
qpid/proton/branches/driver_abstraction/proton-c/src/engine/engine-internal.h
qpid/proton/branches/driver_abstraction/proton-c/src/engine/engine.c
qpid/proton/branches/driver_abstraction/proton-c/src/message/message.c
qpid/proton/branches/driver_abstraction/proton-c/src/pollers/poll.c
qpid/proton/branches/driver_abstraction/proton-c/src/pollers/select.c
Modified: qpid/proton/branches/driver_abstraction/proton-c/bindings/python/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/bindings/python/CMakeLists.txt?rev=1387437&r1=1387436&r2=1387437&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/bindings/python/CMakeLists.txt (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/bindings/python/CMakeLists.txt Wed Sep 19 02:31:16 2012
@@ -15,9 +15,16 @@ install(CODE "execute_process(COMMAND ${
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})")
install(CODE "execute_process(COMMAND ${PYTHON_EXECUTABLE} -O -m py_compile cproton.py
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})")
+install(CODE "execute_process(COMMAND ${PYTHON_EXECUTABLE} -m py_compile proton.py
+ WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR})")
+install(CODE "execute_process(COMMAND ${PYTHON_EXECUTABLE} -O -m py_compile proton.py
+ WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR})")
install(FILES ${CMAKE_CURRENT_BINARY_DIR}/cproton.py
${CMAKE_CURRENT_BINARY_DIR}/cproton.pyc
${CMAKE_CURRENT_BINARY_DIR}/cproton.pyo
+ ${CMAKE_CURRENT_SOURCE_DIR}/proton.py
+ ${CMAKE_CURRENT_SOURCE_DIR}/proton.pyc
+ ${CMAKE_CURRENT_SOURCE_DIR}/proton.pyo
DESTINATION ${PYTHON_SITEARCH_PACKAGES}
COMPONENT Python)
install(FILES ${CMAKE_CURRENT_BINARY_DIR}/_cproton.so
Modified: qpid/proton/branches/driver_abstraction/proton-c/include/proton/engine.h
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/include/proton/engine.h?rev=1387437&r1=1387436&r2=1387437&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/include/proton/engine.h (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/include/proton/engine.h Wed Sep 19 02:31:16 2012
@@ -264,6 +264,8 @@ pn_connection_t *pn_get_connection(pn_se
void pn_session_open(pn_session_t *session);
void pn_session_close(pn_session_t *session);
void pn_session_free(pn_session_t *session);
+void *pn_session_context(pn_session_t *session);
+void pn_session_set_context(pn_session_t *session, void *context);
// link
const char *pn_link_name(pn_link_t *link);
@@ -291,6 +293,8 @@ pn_delivery_t *pn_unsettled_next(pn_deli
void pn_link_open(pn_link_t *sender);
void pn_link_close(pn_link_t *sender);
void pn_link_free(pn_link_t *sender);
+void *pn_link_context(pn_link_t *link);
+void pn_link_set_context(pn_link_t *link, void *context);
// sender
//void pn_offer(pn_sender_t *sender, int credits);
@@ -319,6 +323,8 @@ void pn_disposition(pn_delivery_t *deliv
//int pn_format(pn_delivery_t *delivery);
void pn_settle(pn_delivery_t *delivery);
void pn_delivery_dump(pn_delivery_t *delivery);
+void *pn_delivery_context(pn_delivery_t *delivery);
+void pn_delivery_set_context(pn_delivery_t *delivery, void *context);
#ifdef __cplusplus
}
Modified: qpid/proton/branches/driver_abstraction/proton-c/src/driver-internal.h
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/src/driver-internal.h?rev=1387437&r1=1387436&r2=1387437&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/src/driver-internal.h (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/src/driver-internal.h Wed Sep 19 02:31:16 2012
@@ -103,7 +103,10 @@ struct pn_connector_t {
int pn_connector_poller_init( struct pn_connector_t *);
void pn_connector_poller_destroy( struct pn_connector_t *);
-void pn_driver_poller_wait(struct pn_driver_t *, int timeout_ms);
+//void pn_driver_poller_wait(struct pn_driver_t *, int timeout_ms);
+void pn_driver_poller_wait_1(pn_driver_t *);
+void pn_driver_poller_wait_2(pn_driver_t *, int);
+void pn_driver_poller_wait_3(pn_driver_t *);
int pn_io_handler(pn_connector_t *);
int pn_null_io_handler(pn_connector_t *);
void pn_connector_process_output(pn_connector_t *);
Modified: qpid/proton/branches/driver_abstraction/proton-c/src/driver.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/src/driver.c?rev=1387437&r1=1387436&r2=1387437&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/src/driver.c (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/src/driver.c Wed Sep 19 02:31:16 2012
@@ -587,11 +587,36 @@ void pn_driver_wakeup(pn_driver_t *d)
}
+//
+// XXX - pn_driver_wait has been divided into three internal functions as a
+// temporary workaround for a multi-threading problem. A multi-threaded
+// application must hold a lock on parts 1 and 3, but not on part 2.
+// This temporary change, which is not reflected in the driver's API, allows
+// a multi-threaded application to use the three parts separately.
+//
+// This workaround will eventually be replaced by a more elegant solution
+// to the problem.
+//
+void pn_driver_wait_1(pn_driver_t *d)
+{
+ pn_driver_poller_wait_1(d);
+}
+
+void pn_driver_wait_2(pn_driver_t *d, int timeout)
+{
+ pn_driver_poller_wait_2(d, timeout);
+}
+
+void pn_driver_wait_3(pn_driver_t *d)
+{
+ pn_driver_poller_wait_3(d);
+}
+
void pn_driver_wait(pn_driver_t *d, int timeout)
{
- pn_driver_poller_wait(d, timeout);
- d->listener_next = d->listener_head;
- d->connector_next = d->connector_head;
+ pn_driver_wait_1(d);
+ pn_driver_wait_2(d, timeout);
+ pn_driver_wait_3(d);
}
pn_listener_t *pn_driver_listener(pn_driver_t *d) {
Modified: qpid/proton/branches/driver_abstraction/proton-c/src/engine/engine-internal.h
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/src/engine/engine-internal.h?rev=1387437&r1=1387436&r2=1387437&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/src/engine/engine-internal.h Wed Sep 19 02:31:16 2012
@@ -138,6 +138,7 @@ struct pn_session_t {
size_t link_capacity;
size_t link_count;
size_t id;
+ void *context;
};
struct pn_link_t {
@@ -159,6 +160,7 @@ struct pn_link_t {
bool drain;
bool drained; // sender only
size_t id;
+ void *context;
};
struct pn_delivery_t {
@@ -182,6 +184,7 @@ struct pn_delivery_t {
bool tpwork;
pn_buffer_t *bytes;
bool done;
+ void *transport_context;
void *context;
};
Modified: qpid/proton/branches/driver_abstraction/proton-c/src/engine/engine.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/src/engine/engine.c?rev=1387437&r1=1387436&r2=1387437&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/src/engine/engine.c (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/src/engine/engine.c Wed Sep 19 02:31:16 2012
@@ -296,6 +296,18 @@ void pn_session_free(pn_session_t *sessi
free(session);
}
+void *pn_session_context(pn_session_t *session)
+{
+ return session ? session->context : 0;
+}
+
+void pn_session_set_context(pn_session_t *session, void *context)
+{
+ if (session)
+ session->context = context;
+}
+
+
void pn_add_link(pn_session_t *ssn, pn_link_t *link)
{
PN_ENSURE(ssn->links, ssn->link_capacity, ssn->link_count + 1);
@@ -361,6 +373,17 @@ void pn_link_free(pn_link_t *link)
free(link);
}
+void *pn_link_context(pn_link_t *link)
+{
+ return link ? link->context : 0;
+}
+
+void pn_link_set_context(pn_link_t *link, void *context)
+{
+ if (link)
+ link->context = context;
+}
+
void pn_endpoint_init(pn_endpoint_t *endpoint, int type, pn_connection_t *conn)
{
endpoint->type = type;
@@ -641,6 +664,7 @@ pn_session_t *pn_session(pn_connection_t
ssn->links = NULL;
ssn->link_capacity = 0;
ssn->link_count = 0;
+ ssn->context = 0;
return ssn;
}
@@ -793,6 +817,7 @@ void pn_link_init(pn_link_t *link, int t
link->queued = 0;
link->drain = false;
link->drained = false;
+ link->context = 0;
}
const char *pn_source(pn_link_t *link)
@@ -933,6 +958,7 @@ pn_delivery_t *pn_delivery(pn_link_t *li
delivery->tpwork = false;
pn_buffer_clear(delivery->bytes);
delivery->done = false;
+ delivery->transport_context = NULL;
delivery->context = NULL;
if (!link->current)
@@ -979,6 +1005,17 @@ void pn_delivery_dump(pn_delivery_t *d)
pn_readable(d), d->work);
}
+void *pn_delivery_context(pn_delivery_t *delivery)
+{
+ return delivery ? delivery->context : NULL;
+}
+
+void pn_delivery_set_context(pn_delivery_t *delivery, void *context)
+{
+ if (delivery)
+ delivery->context = context;
+}
+
pn_delivery_tag_t pn_delivery_tag(pn_delivery_t *delivery)
{
if (delivery) {
@@ -1052,8 +1089,8 @@ void pn_real_settle(pn_delivery_t *deliv
void pn_full_settle(pn_delivery_buffer_t *db, pn_delivery_t *delivery)
{
- pn_delivery_state_t *state = delivery->context;
- delivery->context = NULL;
+ pn_delivery_state_t *state = delivery->transport_context;
+ delivery->transport_context = NULL;
if (state) state->delivery = NULL;
pn_real_settle(delivery);
if (state) pn_delivery_buffer_gc(db);
@@ -1247,7 +1284,7 @@ int pn_do_transfer(pn_dispatcher_t *disp
delivery = pn_delivery(link, pn_dtag(tag.start, tag.size));
pn_delivery_state_t *state = pn_delivery_buffer_push(incoming, delivery);
- delivery->context = state;
+ delivery->transport_context = state;
if (id != state->id) {
int err = pn_do_error(transport, "amqp:session:invalid-field",
"sequencing error, expected delivery-id %u, got %u",
@@ -1540,7 +1577,7 @@ bool pn_delivery_buffered(pn_delivery_t
{
if (delivery->settled) return false;
if (pn_is_sender(delivery->link)) {
- pn_delivery_state_t *state = delivery->context;
+ pn_delivery_state_t *state = delivery->transport_context;
if (state) {
return (delivery->done && !state->sent) || pn_buffer_size(delivery->bytes) > 0;
} else {
@@ -1656,7 +1693,7 @@ int pn_post_disp(pn_transport_t *transpo
pn_link_t *link = delivery->link;
pn_session_state_t *ssn_state = pn_session_get_state(transport, link->session);
// XXX: check for null state
- pn_delivery_state_t *state = delivery->context;
+ pn_delivery_state_t *state = delivery->transport_context;
uint64_t code;
switch(delivery->local_state) {
case PN_ACCEPTED:
@@ -1684,10 +1721,10 @@ int pn_process_tpwork_sender(pn_transpor
pn_session_state_t *ssn_state = pn_session_get_state(transport, link->session);
pn_link_state_t *link_state = pn_link_get_state(ssn_state, link);
if ((int16_t) ssn_state->local_channel >= 0 && (int32_t) link_state->local_handle >= 0) {
- pn_delivery_state_t *state = delivery->context;
+ pn_delivery_state_t *state = delivery->transport_context;
if (!state && pn_delivery_buffer_available(&ssn_state->outgoing)) {
state = pn_delivery_buffer_push(&ssn_state->outgoing, delivery);
- delivery->context = state;
+ delivery->transport_context = state;
}
if (state && !state->sent && (delivery->done || pn_buffer_size(delivery->bytes) > 0) &&
@@ -1712,7 +1749,7 @@ int pn_process_tpwork_sender(pn_transpor
}
}
- pn_delivery_state_t *state = delivery->context;
+ pn_delivery_state_t *state = delivery->transport_context;
// XXX: need to prevent duplicate disposition sending
if ((int16_t) ssn_state->local_channel >= 0 && !delivery->remote_settled
&& state && state->sent) {
@@ -1732,7 +1769,7 @@ int pn_process_tpwork_receiver(pn_transp
pn_link_t *link = delivery->link;
// XXX: need to prevent duplicate disposition sending
pn_session_state_t *ssn_state = pn_session_get_state(transport, link->session);
- if ((int16_t) ssn_state->local_channel >= 0 && !delivery->remote_settled && delivery->context) {
+ if ((int16_t) ssn_state->local_channel >= 0 && !delivery->remote_settled && delivery->transport_context) {
int err = pn_post_disp(transport, delivery);
if (err) return err;
}
Modified: qpid/proton/branches/driver_abstraction/proton-c/src/message/message.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/src/message/message.c?rev=1387437&r1=1387436&r2=1387437&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/src/message/message.c (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/src/message/message.c Wed Sep 19 02:31:16 2012
@@ -445,7 +445,7 @@ int pn_message_decode(pn_message_t *msg,
{
pn_bytes_t user_id, address, subject, reply_to, ctype, cencoding,
group_id, reply_to_group_id;
- err = pn_data_scan(msg->data, "D.[.zSSS.ssLLSiS]", &user_id, &address,
+ err = pn_data_scan(msg->data, "D.[.zSSS.ssLLSIS]", &user_id, &address,
&subject, &reply_to, &ctype, &cencoding,
&msg->expiry_time, &msg->creation_time, &group_id,
&msg->group_sequence, &reply_to_group_id);
Modified: qpid/proton/branches/driver_abstraction/proton-c/src/pollers/poll.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/src/pollers/poll.c?rev=1387437&r1=1387436&r2=1387437&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/src/pollers/poll.c (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/src/pollers/poll.c Wed Sep 19 02:31:16 2012
@@ -96,7 +96,7 @@ void pn_connector_poller_destroy( struct
c->poller = NULL;
}
-
+#if 0 // save for now
void pn_driver_poller_wait(pn_driver_t *d, int timeout)
{
pn_driver_poller_t *poller = d->poller;
@@ -166,3 +166,102 @@ void pn_driver_poller_wait(pn_driver_t *
c = c->connector_next;
}
}
+#endif
+
+//
+// XXX - pn_driver_wait has been divided into three internal functions as a
+// temporary workaround for a multi-threading problem. A multi-threaded
+// application must hold a lock on parts 1 and 3, but not on part 2.
+// This temporary change, which is not reflected in the driver's API, allows
+// a multi-threaded application to use the three parts separately.
+//
+// This workaround will eventually be replaced by a more elegant solution
+// to the problem.
+//
+
+static void pn_driver_poller_rebuild(pn_driver_t *d)
+{
+ pn_driver_poller_t *poller = d->poller;
+ size_t size = d->listener_count + d->connector_count;
+ while (poller->capacity < size + 1) {
+ poller->capacity = poller->capacity ? 2*poller->capacity : 16;
+ poller->fds = realloc(poller->fds, poller->capacity*sizeof(struct pollfd));
+ }
+
+ poller->nfds = 0;
+
+ poller->fds[poller->nfds].fd = d->ctrl[0];
+ poller->fds[poller->nfds].events = POLLIN;
+ poller->fds[poller->nfds].revents = 0;
+ poller->nfds++;
+
+ pn_listener_t *l = d->listener_head;
+ for (int i = 0; i < d->listener_count; i++) {
+ poller->fds[poller->nfds].fd = l->fd;
+ poller->fds[poller->nfds].events = POLLIN;
+ poller->fds[poller->nfds].revents = 0;
+ l->poller->idx = poller->nfds;
+ poller->nfds++;
+ l = l->listener_next;
+ }
+
+ pn_connector_t *c = d->connector_head;
+ for (int i = 0; i < d->connector_count; i++)
+ {
+ if (!c->closed) {
+ poller->fds[poller->nfds].fd = c->fd;
+ poller->fds[poller->nfds].events = (c->status & PN_SEL_RD ? POLLIN : 0) |
+ (c->status & PN_SEL_WR ? POLLOUT : 0);
+ poller->fds[poller->nfds].revents = 0;
+ c->poller->idx = poller->nfds;
+ poller->nfds++;
+ }
+ c = c->connector_next;
+ }
+}
+
+void pn_driver_poller_wait_1(pn_driver_t *d)
+{
+ pn_driver_poller_rebuild(d);
+}
+
+void pn_driver_poller_wait_2(pn_driver_t *d, int timeout)
+{
+ pn_driver_poller_t *poller = d->poller;
+ DIE_IFE(poll(poller->fds, poller->nfds, d->closed_count > 0 ? 0 : timeout));
+}
+
+void pn_driver_poller_wait_3(pn_driver_t *d)
+{
+ pn_driver_poller_t *poller = d->poller;
+
+ if (poller->fds[0].revents & POLLIN) {
+ //clear the pipe
+ char buffer[512];
+ while (read(d->ctrl[0], buffer, 512) == 512);
+ }
+
+ pn_listener_t *l = d->listener_head;
+ while (l) {
+ int idx = l->poller->idx;
+ l->pending = (idx && poller->fds[idx].revents & POLLIN);
+ l = l->listener_next;
+ }
+
+ pn_connector_t *c = d->connector_head;
+ while (c) {
+ if (c->closed) {
+ c->pending_read = false;
+ c->pending_write = false;
+ c->pending_tick = false;
+ } else {
+ int idx = c->poller->idx;
+ c->pending_read = (idx && poller->fds[idx].revents & POLLIN);
+ c->pending_write = (idx && poller->fds[idx].revents & POLLOUT);
+ }
+ c = c->connector_next;
+ }
+
+ d->listener_next = d->listener_head;
+ d->connector_next = d->connector_head;
+}
Modified: qpid/proton/branches/driver_abstraction/proton-c/src/pollers/select.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/driver_abstraction/proton-c/src/pollers/select.c?rev=1387437&r1=1387436&r2=1387437&view=diff
==============================================================================
--- qpid/proton/branches/driver_abstraction/proton-c/src/pollers/select.c (original)
+++ qpid/proton/branches/driver_abstraction/proton-c/src/pollers/select.c Wed Sep 19 02:31:16 2012
@@ -78,7 +78,7 @@ void pn_connector_poller_destroy( struct
{
}
-
+#if 0 // save it for now
void pn_driver_poller_wait(pn_driver_t *d, int timeout)
{
pn_driver_poller_t *poller = d->poller;
@@ -147,3 +147,83 @@ void pn_driver_poller_wait(pn_driver_t *
}
}
}
+#endif
+
+void pn_driver_poller_wait_1(pn_driver_t *d)
+{
+ pn_driver_poller_t *poller = d->poller;
+
+ // setup the select
+ FD_ZERO(&poller->readfds);
+ FD_ZERO(&poller->writefds);
+
+ FD_SET(d->ctrl[0], &poller->readfds);
+ poller->max_fds = d->ctrl[0];
+
+ pn_listener_t *l = d->listener_head;
+ for (int i = 0; i < d->listener_count; i++) {
+ FD_SET(l->fd, &poller->readfds);
+ if (l->fd > poller->max_fds) poller->max_fds = l->fd;
+ l = l->listener_next;
+ }
+
+ pn_connector_t *c = d->connector_head;
+ for (int i = 0; i < d->connector_count; i++) {
+ if (!c->closed && (c->status & (PN_SEL_RD|PN_SEL_WR))) {
+ if (c->status & PN_SEL_RD)
+ FD_SET(c->fd, &poller->readfds);
+ if (c->status & PN_SEL_WR)
+ FD_SET(c->fd, &poller->writefds);
+ if (c->fd > poller->max_fds) poller->max_fds = c->fd;
+ }
+ c = c->connector_next;
+ }
+}
+
+void pn_driver_poller_wait_2(pn_driver_t *d, int timeout)
+{
+ pn_driver_poller_t *poller = d->poller;
+
+ struct timeval to = {0};
+ if (timeout > 0) {
+ // convert millisecs to sec and usec:
+ to.tv_sec = timeout/1000;
+ to.tv_usec = (timeout - (to.tv_sec * 1000)) * 1000;
+ }
+
+ int nfds = select(poller->max_fds + 1, &poller->readfds, &poller->writefds, NULL, timeout < 0 ? NULL : &to);
+ DIE_IFE(nfds);
+}
+
+void pn_driver_poller_wait_3(pn_driver_t *d)
+{
+ pn_driver_poller_t *poller = d->poller;
+
+ if (FD_ISSET(d->ctrl[0], &poller->readfds)) {
+ //clear the pipe
+ char buffer[512];
+ while (read(d->ctrl[0], buffer, 512) == 512);
+ }
+
+ pn_listener_t *l = d->listener_head;
+ while (l) {
+ l->pending = FD_ISSET(l->fd, &poller->readfds);
+ l = l->listener_next;
+ }
+
+ pn_connector_t *c = d->connector_head;
+ while (c) {
+ if (c->closed) {
+ c->pending_read = false;
+ c->pending_write = false;
+ c->pending_tick = false;
+ } else {
+ c->pending_read = FD_ISSET(c->fd, &poller->readfds);
+ c->pending_write = FD_ISSET(c->fd, &poller->writefds);
+ }
+ c = c->connector_next;
+ }
+
+ d->listener_next = d->listener_head;
+ d->connector_next = d->connector_head;
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org