You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2012/07/11 14:39:58 UTC

svn commit: r1360151 - in /qpid/proton/branches/libevent_driver/proton-c: CMakeLists.txt pn_config.h.in src/driver.c src/driver_impl.h src/drivers/ src/drivers/driver_poll.c src/drivers/driver_select.c

Author: kgiusti
Date: Wed Jul 11 12:39:58 2012
New Revision: 1360151

URL: http://svn.apache.org/viewvc?rev=1360151&view=rev
Log:
NO-JIRA: moved transport specific driver code to drivers directory

Added:
    qpid/proton/branches/libevent_driver/proton-c/src/driver_impl.h
      - copied unchanged from r1360149, qpid/proton/branches/driver_abstraction/proton-c/src/driver_impl.h
    qpid/proton/branches/libevent_driver/proton-c/src/drivers/
      - copied from r1360149, qpid/proton/branches/driver_abstraction/proton-c/src/drivers/
    qpid/proton/branches/libevent_driver/proton-c/src/drivers/driver_poll.c
      - copied unchanged from r1360149, qpid/proton/branches/driver_abstraction/proton-c/src/drivers/driver_poll.c
    qpid/proton/branches/libevent_driver/proton-c/src/drivers/driver_select.c
      - copied unchanged from r1360149, qpid/proton/branches/driver_abstraction/proton-c/src/drivers/driver_select.c
Modified:
    qpid/proton/branches/libevent_driver/proton-c/CMakeLists.txt
    qpid/proton/branches/libevent_driver/proton-c/pn_config.h.in
    qpid/proton/branches/libevent_driver/proton-c/src/driver.c

Modified: qpid/proton/branches/libevent_driver/proton-c/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/proton/branches/libevent_driver/proton-c/CMakeLists.txt?rev=1360151&r1=1360150&r2=1360151&view=diff
==============================================================================
--- qpid/proton/branches/libevent_driver/proton-c/CMakeLists.txt (original)
+++ qpid/proton/branches/libevent_driver/proton-c/CMakeLists.txt Wed Jul 11 12:39:58 2012
@@ -6,6 +6,20 @@ set (PN_VERSION_MAJOR 0)
 set (PN_VERSION_MINOR 1)
 set (LINK_DEPS uuid)
 
+include(CheckIncludeFile)
+
+CHECK_INCLUDE_FILE(poll.h HAVE_POLL_H)
+CHECK_INCLUDE_FILE(sys/select.h HAVE_SYS_SELECT_H)
+
+# Set default poller implementation (check from general to specific to allow overriding)
+if (HAVE_SYS_SELECT_H)
+  set(poller_default select)
+endif (HAVE_SYS_SELECT_H)
+if (HAVE_POLL_H)
+  set(poller_default poll)
+endif (HAVE_POLL_H)
+set(POLLER ${poller_default} CACHE STRING "Poller implementation (poll/select)")
+
 configure_file (
   "${PROJECT_SOURCE_DIR}/pn_config.h.in"
   "${PROJECT_BINARY_DIR}/pn_config.h"
@@ -26,6 +40,17 @@ add_custom_command (
   DEPENDS ${PROJECT_SOURCE_DIR}/src/protocol.h.py
 )
 
+if (POLLER STREQUAL poll)
+  set (pn_driver_impl
+       src/drivers/driver_poll.c
+  )
+elseif (POLLER STREQUAL select)
+  set (pn_driver_impl
+       src/drivers/driver_select.c
+  )
+endif (POLLER STREQUAL poll)
+
+
 add_library (
   qpidproton SHARED
 
@@ -50,6 +75,8 @@ add_library (
   ${PROJECT_BINARY_DIR}/protocol.h
 
   src/messenger.c
+
+  ${pn_driver_impl}
 )
 
 include(FindSWIG)

Modified: qpid/proton/branches/libevent_driver/proton-c/pn_config.h.in
URL: http://svn.apache.org/viewvc/qpid/proton/branches/libevent_driver/proton-c/pn_config.h.in?rev=1360151&r1=1360150&r2=1360151&view=diff
==============================================================================
--- qpid/proton/branches/libevent_driver/proton-c/pn_config.h.in (original)
+++ qpid/proton/branches/libevent_driver/proton-c/pn_config.h.in Wed Jul 11 12:39:58 2012
@@ -25,4 +25,8 @@
 #define PN_VERSION_MAJOR @PN_VERSION_MAJOR@
 #define PN_VERSION_MINOR @PN_VERSION_MINOR@
 
+
+#cmakedefine HAVE_POLL_H 1
+#cmakedefine HAVE_SYS_SELECT_H 1
+
 #endif /* pn_config.h */

Modified: qpid/proton/branches/libevent_driver/proton-c/src/driver.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/libevent_driver/proton-c/src/driver.c?rev=1360151&r1=1360150&r2=1360151&view=diff
==============================================================================
--- qpid/proton/branches/libevent_driver/proton-c/src/driver.c (original)
+++ qpid/proton/branches/libevent_driver/proton-c/src/driver.c Wed Jul 11 12:39:58 2012
@@ -21,7 +21,11 @@
 
 #define _POSIX_C_SOURCE 1
 
-//#include <poll.h>
+#include <proton/driver.h>
+#include <proton/sasl.h>
+#include "util.h"
+#include "driver_impl.h"
+
 #include <stdio.h>
 #include <time.h>
 #include <ctype.h>
@@ -29,81 +33,8 @@
 #include <sys/socket.h>
 #include <netdb.h>
 #include <unistd.h>
-#include <sys/select.h>
-
-#include <proton/driver.h>
-#include <proton/sasl.h>
-#include "util.h"
 
 
-/* Decls */
-
-#define PN_SEL_RD (0x0001)
-#define PN_SEL_WR (0x0002)
-
-struct pn_driver_t {
-  pn_listener_t *listener_head;
-  pn_listener_t *listener_tail;
-  pn_listener_t *listener_next;
-  pn_connector_t *connector_head;
-  pn_connector_t *connector_tail;
-  pn_connector_t *connector_next;
-  size_t listener_count;
-  size_t connector_count;
-  size_t closed_count;
-  fd_set readfds;
-  fd_set writefds;
-  int max_fds;
-  int ctrl[2]; //pipe for updating selectable status
-  pn_trace_t trace;
-};
-
-struct pn_listener_t {
-  pn_driver_t *driver;
-  pn_listener_t *listener_next;
-  pn_listener_t *listener_prev;
-  int idx;
-  bool pending;
-  int fd;
-  void *context;
-};
-
-#define IO_BUF_SIZE (4*1024)
-#define NAME_MAX (256)
-
-struct pn_connector_t {
-  pn_driver_t *driver;
-  pn_connector_t *connector_next;
-  pn_connector_t *connector_prev;
-  char name[256];
-  int idx;
-  bool pending_tick;
-  bool pending_read;
-  bool pending_write;
-  int fd;
-  int status;
-  pn_trace_t trace;
-  bool closed;
-  time_t wakeup;
-  void (*read)(pn_connector_t *);
-  void (*write) (pn_connector_t *);
-  time_t (*tick)(pn_connector_t *sel, time_t now);
-  size_t input_size;
-  char input[IO_BUF_SIZE];
-  bool input_eos;
-  size_t output_size;
-  char output[IO_BUF_SIZE];
-  pn_sasl_t *sasl;
-  pn_connection_t *connection;
-  pn_transport_t *transport;
-  ssize_t (*process_input)(pn_connector_t *);
-  ssize_t (*process_output)(pn_connector_t *);
-  bool input_done;
-  bool output_done;
-  pn_listener_t *listener;
-  void *context;
-};
-
 /* Impls */
 
 // listener
@@ -182,11 +113,12 @@ pn_listener_t *pn_listener_fd(pn_driver_
   l->driver = driver;
   l->listener_next = NULL;
   l->listener_prev = NULL;
-  l->idx = 0;
   l->pending = false;
   l->fd = fd;
   l->context = context;
 
+  pn_listener_impl_init(l);
+
   pn_driver_add_listener(driver, l);
   return l;
 }
@@ -233,7 +165,7 @@ pn_connector_t *pn_listener_accept(pn_li
       if (l->driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV))
         fprintf(stderr, "Accepted from %s:%s\n", host, serv);
       pn_connector_t *c = pn_connector_fd(l->driver, sock, NULL);
-      snprintf(c->name, NAME_MAX, "%s:%s", host, serv);
+      snprintf(c->name, PN_CONNECTOR_NAME_MAX, "%s:%s", host, serv);
       c->listener = l;
       return c;
     }
@@ -253,6 +185,7 @@ void pn_listener_free(pn_listener_t *l)
   if (!l) return;
 
   if (l->driver) pn_driver_remove_listener(l->driver, l);
+  pn_listener_impl_destroy(l);
   free(l);
 }
 
@@ -308,7 +241,7 @@ pn_connector_t *pn_connector(pn_driver_t
   freeaddrinfo(addr);
 
   pn_connector_t *c = pn_connector_fd(driver, sock, context);
-  snprintf(c->name, NAME_MAX, "%s:%s", host, port);
+  snprintf(c->name, PN_CONNECTOR_NAME_MAX, "%s:%s", host, port);
   if (driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV))
     fprintf(stderr, "Connected to %s\n", c->name);
   return c;
@@ -340,7 +273,6 @@ pn_connector_t *pn_connector_fd(pn_drive
   c->pending_read = false;
   c->pending_write = false;
   c->name[0] = '\0';
-  c->idx = 0;
   c->fd = fd;
   c->status = PN_SEL_RD | PN_SEL_WR;
   c->trace = driver->trace;
@@ -362,6 +294,8 @@ pn_connector_t *pn_connector_fd(pn_drive
   c->context = context;
   c->listener = NULL;
 
+  pn_connector_impl_init(c);
+
   pn_connector_trace(c, driver->trace);
 
   pn_driver_add_connector(driver, c);
@@ -435,12 +369,13 @@ void pn_connector_free(pn_connector_t *c
   ctor->connection = NULL;
   ctor->transport = NULL;
   pn_sasl_free(ctor->sasl);
+  pn_connector_impl_destroy(ctor);
   free(ctor);
 }
 
 static void pn_connector_read(pn_connector_t *ctor)
 {
-  ssize_t n = recv(ctor->fd, ctor->input + ctor->input_size, IO_BUF_SIZE - ctor->input_size, 0);
+  ssize_t n = recv(ctor->fd, ctor->input + ctor->input_size, PN_CONNECTOR_IO_BUF_SIZE - ctor->input_size, 0);
   if (n <= 0) {
     if (n < 0) perror("read");
     ctor->status &= ~PN_SEL_RD;
@@ -554,7 +489,7 @@ static char *pn_connector_output(pn_conn
 
 static size_t pn_connector_available(pn_connector_t *ctor)
 {
-  return IO_BUF_SIZE - ctor->output_size;
+  return PN_CONNECTOR_IO_BUF_SIZE - ctor->output_size;
 }
 
 static void pn_connector_process_output(pn_connector_t *ctor)
@@ -689,7 +624,6 @@ pn_driver_t *pn_driver()
   d->listener_count = 0;
   d->connector_count = 0;
   d->closed_count = 0;
-  d->max_fds = 0;
   d->ctrl[0] = 0;
   d->ctrl[1] = 0;
   d->trace = ((pn_env_bool("PN_TRACE_RAW") ? PN_TRACE_RAW : PN_TRACE_OFF) |
@@ -701,6 +635,8 @@ pn_driver_t *pn_driver()
     perror("Can't create control pipe");
   }
 
+  pn_driver_impl_init(d);
+
   return d;
 }
 
@@ -719,6 +655,8 @@ void pn_driver_free(pn_driver_t *d)
     pn_connector_free(d->connector_head);
   while (d->listener_head)
     pn_listener_free(d->listener_head);
+
+  pn_driver_impl_destroy(d);
   free(d);
 }
 
@@ -729,77 +667,10 @@ void pn_driver_wakeup(pn_driver_t *d)
   }
 }
 
-static void pn_driver_rebuild(pn_driver_t *d)
-{
-  d->max_fds = -1;
-  FD_ZERO(&d->readfds);
-  FD_ZERO(&d->writefds);
-
-  FD_SET(d->ctrl[0], &d->readfds);
-  if (d->ctrl[0] > d->max_fds) d->max_fds = d->ctrl[0];
-
-  pn_listener_t *l = d->listener_head;
-  for (int i = 0; i < d->listener_count; i++) {
-      FD_SET(l->fd, &d->readfds);
-      if (l->fd > d->max_fds) d->max_fds = l->fd;
-      l = l->listener_next;
-  }
-
-  pn_connector_t *c = d->connector_head;
-  for (int i = 0; i < d->connector_count; i++) {
-      if (!c->closed && (c->status & (PN_SEL_RD|PN_SEL_WR))) {
-          if (c->status & PN_SEL_RD)
-              FD_SET(c->fd, &d->readfds);
-          if (c->status & PN_SEL_WR)
-              FD_SET(c->fd, &d->writefds);
-          if (c->fd > d->max_fds) d->max_fds = c->fd;
-      }
-      c = c->connector_next;
-  }
-}
 
 void pn_driver_wait(pn_driver_t *d, int timeout)
 {
-  struct timeval to = {0};
-  if (timeout > 0) {
-      // convert millisecs to sec and usec:
-      to.tv_sec = timeout/1000;
-      to.tv_usec = (timeout - (to.tv_sec * 1000)) * 1000;
-  }
-
-  pn_driver_rebuild(d);
-
-  int nfds = select(d->max_fds + 1, &d->readfds, &d->writefds, NULL, timeout < 0 ? NULL : &to);
-  DIE_IFE(nfds);
-
-  if (nfds > 0) {
-
-      if (FD_ISSET(d->ctrl[0], &d->readfds)) {
-          //clear the pipe
-          char buffer[512];
-          while (read(d->ctrl[0], buffer, 512) == 512);
-      }
-
-      pn_listener_t *l = d->listener_head;
-      while (l) {
-          l->pending = FD_ISSET(l->fd, &d->readfds);
-          l = l->listener_next;
-      }
-
-      pn_connector_t *c = d->connector_head;
-      while (c) {
-          if (c->closed) {
-              c->pending_read = false;
-              c->pending_write = false;
-              c->pending_tick = false;
-          } else {
-              c->pending_read = FD_ISSET(c->fd, &d->readfds);
-              c->pending_write = FD_ISSET(c->fd, &d->writefds);
-          }
-          c = c->connector_next;
-      }
-  }
-
+  pn_driver_impl_wait(d, timeout);
   d->listener_next = d->listener_head;
   d->connector_next = d->connector_head;
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org