You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2012/07/10 19:58:49 UTC
svn commit: r1359810 - in /qpid/proton/branches/libevent_driver: ./
proton-c/src/driver.c
Author: kgiusti
Date: Tue Jul 10 17:58:48 2012
New Revision: 1359810
URL: http://svn.apache.org/viewvc?rev=1359810&view=rev
Log:
NO-JIRA: use select() model instead of poll for portability.
Modified:
qpid/proton/branches/libevent_driver/ (props changed)
qpid/proton/branches/libevent_driver/proton-c/src/driver.c
Propchange: qpid/proton/branches/libevent_driver/
------------------------------------------------------------------------------
svn:mergeinfo = /qpid/proton/trunk:1357874-1359740
Modified: qpid/proton/branches/libevent_driver/proton-c/src/driver.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/libevent_driver/proton-c/src/driver.c?rev=1359810&r1=1359809&r2=1359810&view=diff
==============================================================================
--- qpid/proton/branches/libevent_driver/proton-c/src/driver.c (original)
+++ qpid/proton/branches/libevent_driver/proton-c/src/driver.c Tue Jul 10 17:58:48 2012
@@ -21,7 +21,7 @@
#define _POSIX_C_SOURCE 1
-#include <poll.h>
+//#include <poll.h>
#include <stdio.h>
#include <time.h>
#include <ctype.h>
@@ -29,6 +29,7 @@
#include <sys/socket.h>
#include <netdb.h>
#include <unistd.h>
+#include <sys/select.h>
#include <proton/driver.h>
#include <proton/sasl.h>
@@ -50,9 +51,9 @@ struct pn_driver_t {
size_t listener_count;
size_t connector_count;
size_t closed_count;
- size_t capacity;
- struct pollfd *fds;
- size_t nfds;
+ fd_set readfds;
+ fd_set writefds;
+ int max_fds;
int ctrl[2]; //pipe for updating selectable status
pn_trace_t trace;
};
@@ -688,9 +689,7 @@ pn_driver_t *pn_driver()
d->listener_count = 0;
d->connector_count = 0;
d->closed_count = 0;
- d->capacity = 0;
- d->fds = NULL;
- d->nfds = 0;
+ d->max_fds = 0;
d->ctrl[0] = 0;
d->ctrl[1] = 0;
d->trace = ((pn_env_bool("PN_TRACE_RAW") ? PN_TRACE_RAW : PN_TRACE_OFF) |
@@ -720,7 +719,6 @@ void pn_driver_free(pn_driver_t *d)
pn_connector_free(d->connector_head);
while (d->listener_head)
pn_listener_free(d->listener_head);
- free(d->fds);
free(d);
}
@@ -733,73 +731,73 @@ void pn_driver_wakeup(pn_driver_t *d)
static void pn_driver_rebuild(pn_driver_t *d)
{
- size_t size = d->listener_count + d->connector_count;
- while (d->capacity < size + 1) {
- d->capacity = d->capacity ? 2*d->capacity : 16;
- d->fds = realloc(d->fds, d->capacity*sizeof(struct pollfd));
- }
+ d->max_fds = -1;
+ FD_ZERO(&d->readfds);
+ FD_ZERO(&d->writefds);
- d->nfds = 0;
-
- d->fds[d->nfds].fd = d->ctrl[0];
- d->fds[d->nfds].events = POLLIN;
- d->fds[d->nfds].revents = 0;
- d->nfds++;
+ FD_SET(d->ctrl[0], &d->readfds);
+ if (d->ctrl[0] > d->max_fds) d->max_fds = d->ctrl[0];
pn_listener_t *l = d->listener_head;
for (int i = 0; i < d->listener_count; i++) {
- d->fds[d->nfds].fd = l->fd;
- d->fds[d->nfds].events = POLLIN;
- d->fds[d->nfds].revents = 0;
- l->idx = d->nfds;
- d->nfds++;
- l = l->listener_next;
+ FD_SET(l->fd, &d->readfds);
+ if (l->fd > d->max_fds) d->max_fds = l->fd;
+ l = l->listener_next;
}
pn_connector_t *c = d->connector_head;
- for (int i = 0; i < d->connector_count; i++)
- {
- if (!c->closed) {
- d->fds[d->nfds].fd = c->fd;
- d->fds[d->nfds].events = (c->status & PN_SEL_RD ? POLLIN : 0) |
- (c->status & PN_SEL_WR ? POLLOUT : 0);
- d->fds[d->nfds].revents = 0;
- c->idx = d->nfds;
- d->nfds++;
- }
- c = c->connector_next;
+ for (int i = 0; i < d->connector_count; i++) {
+ if (!c->closed && (c->status & (PN_SEL_RD|PN_SEL_WR))) {
+ if (c->status & PN_SEL_RD)
+ FD_SET(c->fd, &d->readfds);
+ if (c->status & PN_SEL_WR)
+ FD_SET(c->fd, &d->writefds);
+ if (c->fd > d->max_fds) d->max_fds = c->fd;
+ }
+ c = c->connector_next;
}
}
-void pn_driver_wait(pn_driver_t *d, int timeout) {
+void pn_driver_wait(pn_driver_t *d, int timeout)
+{
+ struct timeval to = {0};
+ if (timeout > 0) {
+ // convert millisecs to sec and usec:
+ to.tv_sec = timeout/1000;
+ to.tv_usec = (timeout - (to.tv_sec * 1000)) * 1000;
+ }
+
pn_driver_rebuild(d);
- DIE_IFE(poll(d->fds, d->nfds, d->closed_count > 0 ? 0 : timeout));
+ int nfds = select(d->max_fds + 1, &d->readfds, &d->writefds, NULL, timeout < 0 ? NULL : &to);
+ DIE_IFE(nfds);
- if (d->fds[0].revents & POLLIN) {
- //clear the pipe
- char buffer[512];
- while (read(d->ctrl[0], buffer, 512) == 512);
- }
+ if (nfds > 0) {
- pn_listener_t *l = d->listener_head;
- while (l) {
- l->pending = (l->idx && d->fds[l->idx].revents & POLLIN);
- l = l->listener_next;
- }
+ if (FD_ISSET(d->ctrl[0], &d->readfds)) {
+ //clear the pipe
+ char buffer[512];
+ while (read(d->ctrl[0], buffer, 512) == 512);
+ }
- pn_connector_t *c = d->connector_head;
- while (c) {
- if (c->closed) {
- c->pending_read = false;
- c->pending_write = false;
- c->pending_tick = false;
- } else {
- int idx = c->idx;
- c->pending_read = (idx && d->fds[idx].revents & POLLIN);
- c->pending_write = (idx && d->fds[idx].revents & POLLOUT);
- }
- c = c->connector_next;
+ pn_listener_t *l = d->listener_head;
+ while (l) {
+ l->pending = FD_ISSET(l->fd, &d->readfds);
+ l = l->listener_next;
+ }
+
+ pn_connector_t *c = d->connector_head;
+ while (c) {
+ if (c->closed) {
+ c->pending_read = false;
+ c->pending_write = false;
+ c->pending_tick = false;
+ } else {
+ c->pending_read = FD_ISSET(c->fd, &d->readfds);
+ c->pending_write = FD_ISSET(c->fd, &d->writefds);
+ }
+ c = c->connector_next;
+ }
}
d->listener_next = d->listener_head;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org