You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2012/07/10 19:58:49 UTC

svn commit: r1359810 - in /qpid/proton/branches/libevent_driver: ./ proton-c/src/driver.c

Author: kgiusti
Date: Tue Jul 10 17:58:48 2012
New Revision: 1359810

URL: http://svn.apache.org/viewvc?rev=1359810&view=rev
Log:
NO-JIRA: use select() model instead of poll for portability.

Modified:
    qpid/proton/branches/libevent_driver/   (props changed)
    qpid/proton/branches/libevent_driver/proton-c/src/driver.c

Propchange: qpid/proton/branches/libevent_driver/
------------------------------------------------------------------------------
    svn:mergeinfo = /qpid/proton/trunk:1357874-1359740

Modified: qpid/proton/branches/libevent_driver/proton-c/src/driver.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/libevent_driver/proton-c/src/driver.c?rev=1359810&r1=1359809&r2=1359810&view=diff
==============================================================================
--- qpid/proton/branches/libevent_driver/proton-c/src/driver.c (original)
+++ qpid/proton/branches/libevent_driver/proton-c/src/driver.c Tue Jul 10 17:58:48 2012
@@ -21,7 +21,7 @@
 
 #define _POSIX_C_SOURCE 1
 
-#include <poll.h>
+//#include <poll.h>
 #include <stdio.h>
 #include <time.h>
 #include <ctype.h>
@@ -29,6 +29,7 @@
 #include <sys/socket.h>
 #include <netdb.h>
 #include <unistd.h>
+#include <sys/select.h>
 
 #include <proton/driver.h>
 #include <proton/sasl.h>
@@ -50,9 +51,9 @@ struct pn_driver_t {
   size_t listener_count;
   size_t connector_count;
   size_t closed_count;
-  size_t capacity;
-  struct pollfd *fds;
-  size_t nfds;
+  fd_set readfds;
+  fd_set writefds;
+  int max_fds;
   int ctrl[2]; //pipe for updating selectable status
   pn_trace_t trace;
 };
@@ -688,9 +689,7 @@ pn_driver_t *pn_driver()
   d->listener_count = 0;
   d->connector_count = 0;
   d->closed_count = 0;
-  d->capacity = 0;
-  d->fds = NULL;
-  d->nfds = 0;
+  d->max_fds = 0;
   d->ctrl[0] = 0;
   d->ctrl[1] = 0;
   d->trace = ((pn_env_bool("PN_TRACE_RAW") ? PN_TRACE_RAW : PN_TRACE_OFF) |
@@ -720,7 +719,6 @@ void pn_driver_free(pn_driver_t *d)
     pn_connector_free(d->connector_head);
   while (d->listener_head)
     pn_listener_free(d->listener_head);
-  free(d->fds);
   free(d);
 }
 
@@ -733,73 +731,73 @@ void pn_driver_wakeup(pn_driver_t *d)
 
 static void pn_driver_rebuild(pn_driver_t *d)
 {
-  size_t size = d->listener_count + d->connector_count;
-  while (d->capacity < size + 1) {
-    d->capacity = d->capacity ? 2*d->capacity : 16;
-    d->fds = realloc(d->fds, d->capacity*sizeof(struct pollfd));
-  }
+  d->max_fds = -1;
+  FD_ZERO(&d->readfds);
+  FD_ZERO(&d->writefds);
 
-  d->nfds = 0;
-
-  d->fds[d->nfds].fd = d->ctrl[0];
-  d->fds[d->nfds].events = POLLIN;
-  d->fds[d->nfds].revents = 0;
-  d->nfds++;
+  FD_SET(d->ctrl[0], &d->readfds);
+  if (d->ctrl[0] > d->max_fds) d->max_fds = d->ctrl[0];
 
   pn_listener_t *l = d->listener_head;
   for (int i = 0; i < d->listener_count; i++) {
-    d->fds[d->nfds].fd = l->fd;
-    d->fds[d->nfds].events = POLLIN;
-    d->fds[d->nfds].revents = 0;
-    l->idx = d->nfds;
-    d->nfds++;
-    l = l->listener_next;
+      FD_SET(l->fd, &d->readfds);
+      if (l->fd > d->max_fds) d->max_fds = l->fd;
+      l = l->listener_next;
   }
 
   pn_connector_t *c = d->connector_head;
-  for (int i = 0; i < d->connector_count; i++)
-  {
-    if (!c->closed) {
-      d->fds[d->nfds].fd = c->fd;
-      d->fds[d->nfds].events = (c->status & PN_SEL_RD ? POLLIN : 0) |
-        (c->status & PN_SEL_WR ? POLLOUT : 0);
-      d->fds[d->nfds].revents = 0;
-      c->idx = d->nfds;
-      d->nfds++;
-    }
-    c = c->connector_next;
+  for (int i = 0; i < d->connector_count; i++) {
+      if (!c->closed && (c->status & (PN_SEL_RD|PN_SEL_WR))) {
+          if (c->status & PN_SEL_RD)
+              FD_SET(c->fd, &d->readfds);
+          if (c->status & PN_SEL_WR)
+              FD_SET(c->fd, &d->writefds);
+          if (c->fd > d->max_fds) d->max_fds = c->fd;
+      }
+      c = c->connector_next;
   }
 }
 
-void pn_driver_wait(pn_driver_t *d, int timeout) {
+void pn_driver_wait(pn_driver_t *d, int timeout)
+{
+  struct timeval to = {0};
+  if (timeout > 0) {
+      // convert millisecs to sec and usec:
+      to.tv_sec = timeout/1000;
+      to.tv_usec = (timeout - (to.tv_sec * 1000)) * 1000;
+  }
+
   pn_driver_rebuild(d);
 
-  DIE_IFE(poll(d->fds, d->nfds, d->closed_count > 0 ? 0 : timeout));
+  int nfds = select(d->max_fds + 1, &d->readfds, &d->writefds, NULL, timeout < 0 ? NULL : &to);
+  DIE_IFE(nfds);
 
-  if (d->fds[0].revents & POLLIN) {
-    //clear the pipe
-    char buffer[512];
-    while (read(d->ctrl[0], buffer, 512) == 512);
-  }
+  if (nfds > 0) {
 
-  pn_listener_t *l = d->listener_head;
-  while (l) {
-    l->pending = (l->idx && d->fds[l->idx].revents & POLLIN);
-    l = l->listener_next;
-  }
+      if (FD_ISSET(d->ctrl[0], &d->readfds)) {
+          //clear the pipe
+          char buffer[512];
+          while (read(d->ctrl[0], buffer, 512) == 512);
+      }
 
-  pn_connector_t *c = d->connector_head;
-  while (c) {
-    if (c->closed) {
-      c->pending_read = false;
-      c->pending_write = false;
-      c->pending_tick = false;
-    } else {
-      int idx = c->idx;
-      c->pending_read = (idx && d->fds[idx].revents & POLLIN);
-      c->pending_write = (idx && d->fds[idx].revents & POLLOUT);
-    }
-    c = c->connector_next;
+      pn_listener_t *l = d->listener_head;
+      while (l) {
+          l->pending = FD_ISSET(l->fd, &d->readfds);
+          l = l->listener_next;
+      }
+
+      pn_connector_t *c = d->connector_head;
+      while (c) {
+          if (c->closed) {
+              c->pending_read = false;
+              c->pending_write = false;
+              c->pending_tick = false;
+          } else {
+              c->pending_read = FD_ISSET(c->fd, &d->readfds);
+              c->pending_write = FD_ISSET(c->fd, &d->writefds);
+          }
+          c = c->connector_next;
+      }
   }
 
   d->listener_next = d->listener_head;



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org