You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2012/07/11 14:39:58 UTC
svn commit: r1360151 - in /qpid/proton/branches/libevent_driver/proton-c:
CMakeLists.txt pn_config.h.in src/driver.c src/driver_impl.h src/drivers/
src/drivers/driver_poll.c src/drivers/driver_select.c
Author: kgiusti
Date: Wed Jul 11 12:39:58 2012
New Revision: 1360151
URL: http://svn.apache.org/viewvc?rev=1360151&view=rev
Log:
NO-JIRA: moved transport specific driver code to drivers directory
Added:
qpid/proton/branches/libevent_driver/proton-c/src/driver_impl.h
- copied unchanged from r1360149, qpid/proton/branches/driver_abstraction/proton-c/src/driver_impl.h
qpid/proton/branches/libevent_driver/proton-c/src/drivers/
- copied from r1360149, qpid/proton/branches/driver_abstraction/proton-c/src/drivers/
qpid/proton/branches/libevent_driver/proton-c/src/drivers/driver_poll.c
- copied unchanged from r1360149, qpid/proton/branches/driver_abstraction/proton-c/src/drivers/driver_poll.c
qpid/proton/branches/libevent_driver/proton-c/src/drivers/driver_select.c
- copied unchanged from r1360149, qpid/proton/branches/driver_abstraction/proton-c/src/drivers/driver_select.c
Modified:
qpid/proton/branches/libevent_driver/proton-c/CMakeLists.txt
qpid/proton/branches/libevent_driver/proton-c/pn_config.h.in
qpid/proton/branches/libevent_driver/proton-c/src/driver.c
Modified: qpid/proton/branches/libevent_driver/proton-c/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/proton/branches/libevent_driver/proton-c/CMakeLists.txt?rev=1360151&r1=1360150&r2=1360151&view=diff
==============================================================================
--- qpid/proton/branches/libevent_driver/proton-c/CMakeLists.txt (original)
+++ qpid/proton/branches/libevent_driver/proton-c/CMakeLists.txt Wed Jul 11 12:39:58 2012
@@ -6,6 +6,20 @@ set (PN_VERSION_MAJOR 0)
set (PN_VERSION_MINOR 1)
set (LINK_DEPS uuid)
+include(CheckIncludeFile)
+
+CHECK_INCLUDE_FILE(poll.h HAVE_POLL_H)
+CHECK_INCLUDE_FILE(sys/select.h HAVE_SYS_SELECT_H)
+
+# Set default poller implementation (check from general to specific to allow overriding)
+if (HAVE_SYS_SELECT_H)
+ set(poller_default select)
+endif (HAVE_SYS_SELECT_H)
+if (HAVE_POLL_H)
+ set(poller_default poll)
+endif (HAVE_POLL_H)
+set(POLLER ${poller_default} CACHE STRING "Poller implementation (poll/select)")
+
configure_file (
"${PROJECT_SOURCE_DIR}/pn_config.h.in"
"${PROJECT_BINARY_DIR}/pn_config.h"
@@ -26,6 +40,17 @@ add_custom_command (
DEPENDS ${PROJECT_SOURCE_DIR}/src/protocol.h.py
)
+if (POLLER STREQUAL poll)
+ set (pn_driver_impl
+ src/drivers/driver_poll.c
+ )
+elseif (POLLER STREQUAL select)
+ set (pn_driver_impl
+ src/drivers/driver_select.c
+ )
+endif (POLLER STREQUAL poll)
+
+
add_library (
qpidproton SHARED
@@ -50,6 +75,8 @@ add_library (
${PROJECT_BINARY_DIR}/protocol.h
src/messenger.c
+
+ ${pn_driver_impl}
)
include(FindSWIG)
Modified: qpid/proton/branches/libevent_driver/proton-c/pn_config.h.in
URL: http://svn.apache.org/viewvc/qpid/proton/branches/libevent_driver/proton-c/pn_config.h.in?rev=1360151&r1=1360150&r2=1360151&view=diff
==============================================================================
--- qpid/proton/branches/libevent_driver/proton-c/pn_config.h.in (original)
+++ qpid/proton/branches/libevent_driver/proton-c/pn_config.h.in Wed Jul 11 12:39:58 2012
@@ -25,4 +25,8 @@
#define PN_VERSION_MAJOR @PN_VERSION_MAJOR@
#define PN_VERSION_MINOR @PN_VERSION_MINOR@
+
+#cmakedefine HAVE_POLL_H 1
+#cmakedefine HAVE_SYS_SELECT_H 1
+
#endif /* pn_config.h */
Modified: qpid/proton/branches/libevent_driver/proton-c/src/driver.c
URL: http://svn.apache.org/viewvc/qpid/proton/branches/libevent_driver/proton-c/src/driver.c?rev=1360151&r1=1360150&r2=1360151&view=diff
==============================================================================
--- qpid/proton/branches/libevent_driver/proton-c/src/driver.c (original)
+++ qpid/proton/branches/libevent_driver/proton-c/src/driver.c Wed Jul 11 12:39:58 2012
@@ -21,7 +21,11 @@
#define _POSIX_C_SOURCE 1
-//#include <poll.h>
+#include <proton/driver.h>
+#include <proton/sasl.h>
+#include "util.h"
+#include "driver_impl.h"
+
#include <stdio.h>
#include <time.h>
#include <ctype.h>
@@ -29,81 +33,8 @@
#include <sys/socket.h>
#include <netdb.h>
#include <unistd.h>
-#include <sys/select.h>
-
-#include <proton/driver.h>
-#include <proton/sasl.h>
-#include "util.h"
-/* Decls */
-
-#define PN_SEL_RD (0x0001)
-#define PN_SEL_WR (0x0002)
-
-struct pn_driver_t {
- pn_listener_t *listener_head;
- pn_listener_t *listener_tail;
- pn_listener_t *listener_next;
- pn_connector_t *connector_head;
- pn_connector_t *connector_tail;
- pn_connector_t *connector_next;
- size_t listener_count;
- size_t connector_count;
- size_t closed_count;
- fd_set readfds;
- fd_set writefds;
- int max_fds;
- int ctrl[2]; //pipe for updating selectable status
- pn_trace_t trace;
-};
-
-struct pn_listener_t {
- pn_driver_t *driver;
- pn_listener_t *listener_next;
- pn_listener_t *listener_prev;
- int idx;
- bool pending;
- int fd;
- void *context;
-};
-
-#define IO_BUF_SIZE (4*1024)
-#define NAME_MAX (256)
-
-struct pn_connector_t {
- pn_driver_t *driver;
- pn_connector_t *connector_next;
- pn_connector_t *connector_prev;
- char name[256];
- int idx;
- bool pending_tick;
- bool pending_read;
- bool pending_write;
- int fd;
- int status;
- pn_trace_t trace;
- bool closed;
- time_t wakeup;
- void (*read)(pn_connector_t *);
- void (*write) (pn_connector_t *);
- time_t (*tick)(pn_connector_t *sel, time_t now);
- size_t input_size;
- char input[IO_BUF_SIZE];
- bool input_eos;
- size_t output_size;
- char output[IO_BUF_SIZE];
- pn_sasl_t *sasl;
- pn_connection_t *connection;
- pn_transport_t *transport;
- ssize_t (*process_input)(pn_connector_t *);
- ssize_t (*process_output)(pn_connector_t *);
- bool input_done;
- bool output_done;
- pn_listener_t *listener;
- void *context;
-};
-
/* Impls */
// listener
@@ -182,11 +113,12 @@ pn_listener_t *pn_listener_fd(pn_driver_
l->driver = driver;
l->listener_next = NULL;
l->listener_prev = NULL;
- l->idx = 0;
l->pending = false;
l->fd = fd;
l->context = context;
+ pn_listener_impl_init(l);
+
pn_driver_add_listener(driver, l);
return l;
}
@@ -233,7 +165,7 @@ pn_connector_t *pn_listener_accept(pn_li
if (l->driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV))
fprintf(stderr, "Accepted from %s:%s\n", host, serv);
pn_connector_t *c = pn_connector_fd(l->driver, sock, NULL);
- snprintf(c->name, NAME_MAX, "%s:%s", host, serv);
+ snprintf(c->name, PN_CONNECTOR_NAME_MAX, "%s:%s", host, serv);
c->listener = l;
return c;
}
@@ -253,6 +185,7 @@ void pn_listener_free(pn_listener_t *l)
if (!l) return;
if (l->driver) pn_driver_remove_listener(l->driver, l);
+ pn_listener_impl_destroy(l);
free(l);
}
@@ -308,7 +241,7 @@ pn_connector_t *pn_connector(pn_driver_t
freeaddrinfo(addr);
pn_connector_t *c = pn_connector_fd(driver, sock, context);
- snprintf(c->name, NAME_MAX, "%s:%s", host, port);
+ snprintf(c->name, PN_CONNECTOR_NAME_MAX, "%s:%s", host, port);
if (driver->trace & (PN_TRACE_FRM | PN_TRACE_RAW | PN_TRACE_DRV))
fprintf(stderr, "Connected to %s\n", c->name);
return c;
@@ -340,7 +273,6 @@ pn_connector_t *pn_connector_fd(pn_drive
c->pending_read = false;
c->pending_write = false;
c->name[0] = '\0';
- c->idx = 0;
c->fd = fd;
c->status = PN_SEL_RD | PN_SEL_WR;
c->trace = driver->trace;
@@ -362,6 +294,8 @@ pn_connector_t *pn_connector_fd(pn_drive
c->context = context;
c->listener = NULL;
+ pn_connector_impl_init(c);
+
pn_connector_trace(c, driver->trace);
pn_driver_add_connector(driver, c);
@@ -435,12 +369,13 @@ void pn_connector_free(pn_connector_t *c
ctor->connection = NULL;
ctor->transport = NULL;
pn_sasl_free(ctor->sasl);
+ pn_connector_impl_destroy(ctor);
free(ctor);
}
static void pn_connector_read(pn_connector_t *ctor)
{
- ssize_t n = recv(ctor->fd, ctor->input + ctor->input_size, IO_BUF_SIZE - ctor->input_size, 0);
+ ssize_t n = recv(ctor->fd, ctor->input + ctor->input_size, PN_CONNECTOR_IO_BUF_SIZE - ctor->input_size, 0);
if (n <= 0) {
if (n < 0) perror("read");
ctor->status &= ~PN_SEL_RD;
@@ -554,7 +489,7 @@ static char *pn_connector_output(pn_conn
static size_t pn_connector_available(pn_connector_t *ctor)
{
- return IO_BUF_SIZE - ctor->output_size;
+ return PN_CONNECTOR_IO_BUF_SIZE - ctor->output_size;
}
static void pn_connector_process_output(pn_connector_t *ctor)
@@ -689,7 +624,6 @@ pn_driver_t *pn_driver()
d->listener_count = 0;
d->connector_count = 0;
d->closed_count = 0;
- d->max_fds = 0;
d->ctrl[0] = 0;
d->ctrl[1] = 0;
d->trace = ((pn_env_bool("PN_TRACE_RAW") ? PN_TRACE_RAW : PN_TRACE_OFF) |
@@ -701,6 +635,8 @@ pn_driver_t *pn_driver()
perror("Can't create control pipe");
}
+ pn_driver_impl_init(d);
+
return d;
}
@@ -719,6 +655,8 @@ void pn_driver_free(pn_driver_t *d)
pn_connector_free(d->connector_head);
while (d->listener_head)
pn_listener_free(d->listener_head);
+
+ pn_driver_impl_destroy(d);
free(d);
}
@@ -729,77 +667,10 @@ void pn_driver_wakeup(pn_driver_t *d)
}
}
-static void pn_driver_rebuild(pn_driver_t *d)
-{
- d->max_fds = -1;
- FD_ZERO(&d->readfds);
- FD_ZERO(&d->writefds);
-
- FD_SET(d->ctrl[0], &d->readfds);
- if (d->ctrl[0] > d->max_fds) d->max_fds = d->ctrl[0];
-
- pn_listener_t *l = d->listener_head;
- for (int i = 0; i < d->listener_count; i++) {
- FD_SET(l->fd, &d->readfds);
- if (l->fd > d->max_fds) d->max_fds = l->fd;
- l = l->listener_next;
- }
-
- pn_connector_t *c = d->connector_head;
- for (int i = 0; i < d->connector_count; i++) {
- if (!c->closed && (c->status & (PN_SEL_RD|PN_SEL_WR))) {
- if (c->status & PN_SEL_RD)
- FD_SET(c->fd, &d->readfds);
- if (c->status & PN_SEL_WR)
- FD_SET(c->fd, &d->writefds);
- if (c->fd > d->max_fds) d->max_fds = c->fd;
- }
- c = c->connector_next;
- }
-}
void pn_driver_wait(pn_driver_t *d, int timeout)
{
- struct timeval to = {0};
- if (timeout > 0) {
- // convert millisecs to sec and usec:
- to.tv_sec = timeout/1000;
- to.tv_usec = (timeout - (to.tv_sec * 1000)) * 1000;
- }
-
- pn_driver_rebuild(d);
-
- int nfds = select(d->max_fds + 1, &d->readfds, &d->writefds, NULL, timeout < 0 ? NULL : &to);
- DIE_IFE(nfds);
-
- if (nfds > 0) {
-
- if (FD_ISSET(d->ctrl[0], &d->readfds)) {
- //clear the pipe
- char buffer[512];
- while (read(d->ctrl[0], buffer, 512) == 512);
- }
-
- pn_listener_t *l = d->listener_head;
- while (l) {
- l->pending = FD_ISSET(l->fd, &d->readfds);
- l = l->listener_next;
- }
-
- pn_connector_t *c = d->connector_head;
- while (c) {
- if (c->closed) {
- c->pending_read = false;
- c->pending_write = false;
- c->pending_tick = false;
- } else {
- c->pending_read = FD_ISSET(c->fd, &d->readfds);
- c->pending_write = FD_ISSET(c->fd, &d->writefds);
- }
- c = c->connector_next;
- }
- }
-
+ pn_driver_impl_wait(d, timeout);
d->listener_next = d->listener_head;
d->connector_next = d->connector_head;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org