You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by jp...@apache.org on 2010/01/12 20:30:28 UTC
svn commit: r898479 - in
/incubator/trafficserver/traffic/branches/dev/iocore/net: P_UnixNet.h
P_UnixPollDescriptor.h UnixNetVConnection.cc
Author: jplevyak
Date: Tue Jan 12 19:30:27 2010
New Revision: 898479
URL: http://svn.apache.org/viewvc?rev=898479&view=rev
Log:
TS-50, TS-54: this patch fully supports FreeBSD/KQUEUE without libev.
It also makes edge triggering and level triggering of events fully orthogonal
for USE_EPOLL and USE_KQUEUE so that the differences can be examined.
libev is now optional on FreeBSD and likely could be made optional on OSX.
Currently the default remains edge triggering.
Modified:
incubator/trafficserver/traffic/branches/dev/iocore/net/P_UnixNet.h
incubator/trafficserver/traffic/branches/dev/iocore/net/P_UnixPollDescriptor.h
incubator/trafficserver/traffic/branches/dev/iocore/net/UnixNetVConnection.cc
Modified: incubator/trafficserver/traffic/branches/dev/iocore/net/P_UnixNet.h
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/branches/dev/iocore/net/P_UnixNet.h?rev=898479&r1=898478&r2=898479&view=diff
==============================================================================
--- incubator/trafficserver/traffic/branches/dev/iocore/net/P_UnixNet.h (original)
+++ incubator/trafficserver/traffic/branches/dev/iocore/net/P_UnixNet.h Tue Jan 12 19:30:27 2010
@@ -28,6 +28,7 @@
#include "List.h"
#define USE_EDGE_TRIGGER_EPOLL 1
+#define USE_EDGE_TRIGGER_KQUEUE 1
#ifdef USE_LIBEV
#define EV_MINPRI 0
@@ -68,6 +69,9 @@
#else
int fd;
#endif
+#if defined(USE_KQUEUE) || (defined(USE_EPOLL) && !defined(USE_EDGE_TRIGGERED_EPOLL))
+ int events;
+#endif
EventLoop event_loop;
int type;
union
@@ -83,11 +87,11 @@
int start(EventLoop l, UnixNetVConnection *vc, int events);
int start(EventLoop l, UnixUDPConnection *vc, int events);
int start(EventLoop l, int fd, Continuation *c, int events);
- /*
- Change the existing events by adding modify(EVENTIO_READ)
- or removing modify(-EVENTIO_READ)
- */
+ // Change the existing events by adding modify(EVENTIO_READ)
+ // or removing modify(-EVENTIO_READ), for level triggered I/O
int modify(int events);
+ // Refresh the existing events (i.e. KQUEUE EV_CLEAR), for edge triggered I/O
+ int refresh(int events);
int stop();
int close();
EventIO() {
@@ -126,8 +130,14 @@
#endif
#define EVENTIO_ERROR (EPOLLERR|EPOLLPRI|EPOLLHUP)
#elif defined(USE_KQUEUE)
-#define EVENTIO_READ EVFILT_READ
-#define EVENTIO_READ EVFILT_WRITE
+#ifdef USE_EDGE_TRIGGER_KQUEUE
+#define USE_EDGE_TRIGGER 1
+#define INK_EV_EDGE_TRIGGER EV_CLEAR
+#else
+#define INK_EV_EDGE_TRIGGER 0
+#endif
+#define EVENTIO_READ INK_EVP_IN
+#define EVENTIO_WRITE INK_EVP_OUT
#define EVENTIO_ERROR (0x010|0x002|0x020) // ERR PRI HUP
#else
#error port me
@@ -516,6 +526,10 @@
return 0;
}
+inline int EventIO::refresh(int e) {
+ return 0;
+}
+
inline int EventIO::stop() {
if (event_loop) {
ev_io_stop(event_loop->eio, &eio);
@@ -535,21 +549,83 @@
memset(&ev, 0, sizeof(ev));
ev.events = e;
ev.data.ptr = this;
+#ifndef USE_EDGE_TRIGGERED
+ events = e;
+#endif
return epoll_ctl(event_loop->epoll_fd, EPOLL_CTL_ADD, fd, &ev);
#elif defined(USE_KQUEUE)
- struct kevent ev;
- EV_SET(&ev, con[icon].fd, EVFILT_READ, EV_ADD, 0, 0, con[icon].epoll_ptr);
- return kevent(pd->kqueue_fd, &ev, 1, NULL, 0, NULL);
+ events = e;
+ struct kevent ev[2];
+ int n = 0;
+ if (e & EVENTIO_READ)
+ EV_SET(&ev[n++], fd, EVFILT_READ, EV_ADD|INK_EV_EDGE_TRIGGER, 0, 0, this);
+ if (e & EVENTIO_WRITE)
+ EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_ADD|INK_EV_EDGE_TRIGGER, 0, 0, this);
+ return kevent(l->kqueue_fd, &ev[0], n, NULL, 0, NULL);
#else
#error port me
#endif
}
inline int EventIO::modify(int e) {
- (void)e;
+#if defined(USE_EPOLL) && !defined(USE_EDGE_TRIGGER)
+ struct epoll_event ev;
+ memset(&ev, 0, sizeof(ev));
+ int ee = events;
+ if (e < 0)
+ ee &= ~(-e);
+ else
+ ee |= e;
+ events = ee;
+ ev.events = ee;
+ ev.data.ptr = this;
+ return epoll_ctl(event_loop->epoll_fd, EPOLL_CTL_MOD, fd, &ev);
+#elif defined(USE_KQUEUE) && !defined(USE_EDGE_TRIGGER)
+ int n = 0;
+ struct kevent ev[2];
+ int ee = events;
+ if (e < 0) {
+ ee &= ~(-e);
+ if ((-e) & EVENTIO_READ)
+ EV_SET(&ev[n++], fd, EVFILT_READ, EV_DELETE, 0, 0, this);
+ if ((-e) & EVENTIO_WRITE)
+ EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, this);
+ } else {
+ ee |= e;
+ if (e & EVENTIO_READ)
+ EV_SET(&ev[n++], fd, EVFILT_READ, EV_ADD|INK_EV_EDGE_TRIGGER, 0, 0, this);
+ if (e & EVENTIO_WRITE)
+ EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_ADD|INK_EV_EDGE_TRIGGER, 0, 0, this);
+ }
+ events = ee;
+ if (n)
+ return kevent(event_loop->kqueue_fd, &ev[0], n, NULL, 0, NULL);
+ else
+ return 0;
+#else
return 0;
+#endif
}
+inline int EventIO::refresh(int e) {
+#if defined(USE_KQUEUE) && defined(USE_EDGE_TRIGGER)
+ e = e & events;
+ struct kevent ev[2];
+ int n = 0;
+ if (e & EVENTIO_READ)
+ EV_SET(&ev[n++], fd, EVFILT_READ, EV_ADD|INK_EV_EDGE_TRIGGER, 0, 0, this);
+ if (e & EVENTIO_WRITE)
+ EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_ADD|INK_EV_EDGE_TRIGGER, 0, 0, this);
+ if (n)
+ return kevent(event_loop->kqueue_fd, &ev[0], n, NULL, 0, NULL);
+ else
+ return 0;
+#else
+ return 0;
+#endif
+}
+
+
inline int EventIO::stop() {
if (event_loop) {
#if defined(USE_EPOLL)
@@ -558,10 +634,17 @@
ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
return epoll_ctl(event_loop->epoll_fd, EPOLL_CTL_DEL, fd, &ev);
#elif defined(USE_KQUEUE)
+#if 0
+ // this is not necessary and may result in a race if
+ // a file descriptor is reused between polls
+ int n = 0;
struct kevent ev[2];
- EV_SET(&ev[0], con[icon].fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
- EV_SET(&ev[1], con[icon].fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
- return kevent(pd->kqueue_fd, &ev[0], 2, NULL, 0, NULL);
+ if (events & EVENTIO_READ)
+ EV_SET(&ev[n++], fd, EVFILT_READ, EV_DELETE, 0, 0, this);
+ if (events & EVENTIO_WRITE)
+ EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, this);
+ return kevent(event_loop->kqueue_fd, &ev[0], n, NULL, 0, NULL);
+#endif
#else
#error port me
#endif
Modified: incubator/trafficserver/traffic/branches/dev/iocore/net/P_UnixPollDescriptor.h
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/branches/dev/iocore/net/P_UnixPollDescriptor.h?rev=898479&r1=898478&r2=898479&view=diff
==============================================================================
--- incubator/trafficserver/traffic/branches/dev/iocore/net/P_UnixPollDescriptor.h (original)
+++ incubator/trafficserver/traffic/branches/dev/iocore/net/P_UnixPollDescriptor.h Tue Jan 12 19:30:27 2010
@@ -33,6 +33,14 @@
#ifdef USE_LIBEV
#include "ev.h"
#endif
+#if defined(USE_KQUEUE)
+#include <sys/event.h>
+#define INK_EVP_IN 0x001
+#define INK_EVP_PRI 0x002
+#define INK_EVP_OUT 0x004
+#define INK_EVP_ERR 0x010
+#define INK_EVP_HUP 0x020
+#endif
#define POLL_DESCRIPTOR_SIZE 32768
Modified: incubator/trafficserver/traffic/branches/dev/iocore/net/UnixNetVConnection.cc
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/branches/dev/iocore/net/UnixNetVConnection.cc?rev=898479&r1=898478&r2=898479&view=diff
==============================================================================
--- incubator/trafficserver/traffic/branches/dev/iocore/net/UnixNetVConnection.cc (original)
+++ incubator/trafficserver/traffic/branches/dev/iocore/net/UnixNetVConnection.cc Tue Jan 12 19:30:27 2010
@@ -44,15 +44,16 @@
//
// Prototypes
//
-void net_update_priority(NetHandler * nh, UnixNetVConnection * vc, NetState * ns, int ndone);
+void net_update_priority(NetHandler *nh, UnixNetVConnection *vc, NetState *ns, int ndone);
//
-// Reschedule a UnixNetVConnection by moving VC
+// Reschedule a UnixNetVConnection by moving it
// onto or off of the ready_list
//
static inline void
-read_reschedule(NetHandler * nh, UnixNetVConnection * vc)
+read_reschedule(NetHandler *nh, UnixNetVConnection *vc)
{
+ vc->ep.refresh(EVENTIO_READ);
if (vc->read.triggered && vc->read.enabled) {
nh->read_ready_list.in_or_enqueue(vc);
} else
@@ -60,8 +61,9 @@
}
static inline void
-write_reschedule(NetHandler * nh, UnixNetVConnection * vc)
+write_reschedule(NetHandler *nh, UnixNetVConnection *vc)
{
+ vc->ep.refresh(EVENTIO_WRITE);
if (vc->write.triggered && vc->write.enabled) {
nh->write_ready_list.in_or_enqueue(vc);
} else
@@ -69,7 +71,7 @@
}
void
-net_activity(UnixNetVConnection * vc, EThread * thread)
+net_activity(UnixNetVConnection *vc, EThread *thread)
{
(void) thread;
#ifdef INACTIVITY_TIMEOUT
@@ -85,9 +87,8 @@
}
#else
vc->next_inactivity_timeout_at = 0;
- if (vc->inactivity_timeout_in) {
+ if (vc->inactivity_timeout_in)
vc->next_inactivity_timeout_at = ink_get_hrtime() + vc->inactivity_timeout_in;
- }
#endif
}
@@ -96,7 +97,7 @@
// Function used to close a UnixNetVConnection and free the vc
//
void
-close_UnixNetVConnection(UnixNetVConnection * vc, EThread * t)
+close_UnixNetVConnection(UnixNetVConnection *vc, EThread *t)
{
if (vc->loggingEnabled()) {
vc->addLogMessage("close_UnixNetVConnection");
@@ -142,7 +143,7 @@
// Signal an event
//
static inline int
-read_signal_and_update(int event, UnixNetVConnection * vc)
+read_signal_and_update(int event, UnixNetVConnection *vc)
{
vc->recursion++;
vc->read.vio._cont->handleEvent(event, &vc->read.vio);
@@ -157,7 +158,7 @@
}
static inline int
-write_signal_and_update(int event, UnixNetVConnection * vc)
+write_signal_and_update(int event, UnixNetVConnection *vc)
{
vc->recursion++;
vc->write.vio._cont->handleEvent(event, &vc->write.vio);
@@ -172,7 +173,7 @@
}
static inline int
-read_signal_done(int event, NetHandler * nh, UnixNetVConnection * vc)
+read_signal_done(int event, NetHandler *nh, UnixNetVConnection *vc)
{
vc->read.enabled = 0;
if (read_signal_and_update(event, vc) == EVENT_DONE) {
@@ -184,7 +185,7 @@
}
static inline int
-write_signal_done(int event, NetHandler * nh, UnixNetVConnection * vc)
+write_signal_done(int event, NetHandler *nh, UnixNetVConnection *vc)
{
vc->write.enabled = 0;
if (write_signal_and_update(event, vc) == EVENT_DONE) {
@@ -196,14 +197,14 @@
}
static inline int
-read_signal_error(NetHandler * nh, UnixNetVConnection * vc, int lerrno)
+read_signal_error(NetHandler *nh, UnixNetVConnection *vc, int lerrno)
{
vc->lerrno = lerrno;
return read_signal_done(VC_EVENT_ERROR, nh, vc);
}
static inline int
-write_signal_error(NetHandler * nh, UnixNetVConnection * vc, int lerrno)
+write_signal_error(NetHandler *nh, UnixNetVConnection *vc, int lerrno)
{
vc->lerrno = lerrno;
return write_signal_done(VC_EVENT_ERROR, nh, vc);
@@ -214,11 +215,11 @@
// onto or off of the ready_list.
// Had to wrap this function with net_read_io for SSL.
static void
-read_from_net(NetHandler * nh, UnixNetVConnection * vc, EThread * thread)
+read_from_net(NetHandler *nh, UnixNetVConnection *vc, EThread *thread)
{
vc->addLogMessage("read from net");
- NetState * s = &vc->read;
- ProxyMutex * mutex = thread->mutex;
+ NetState *s = &vc->read;
+ ProxyMutex *mutex = thread->mutex;
MIOBufferAccessor & buf = s->vio.buffer;
int r = 0;
@@ -252,7 +253,7 @@
int niov = 0;
IOVec tiovec[NET_MAX_IOV];
if (toread) {
- IOBufferBlock * b = buf.mbuf->_writer;
+ IOBufferBlock *b = buf.mbuf->_writer;
do {
niov = 0;
rattempted = 0;
@@ -295,7 +296,6 @@
// check for errors
if (r <= 0) {
- // If the socket was not ready,add into the WaitList
if (r == -EAGAIN || r == -ENOTCONN) {
NET_DEBUG_COUNT_DYN_STAT(net_calls_to_read_nodata_stat, 1);
vc->addLogMessage("EAGAIN or ENOTCONN");
@@ -324,9 +324,8 @@
#endif
s->vio.ndone += r;
net_activity(vc, thread);
- } else {
+ } else
r = 0;
- }
// Signal read ready, check if user is not done
if (r) {
@@ -337,10 +336,8 @@
Debug("ssl", "read_from_net, read finished - signal done");
return;
} else {
- if (read_signal_and_update(VC_EVENT_READ_READY, vc) != EVENT_CONT) {
+ if (read_signal_and_update(VC_EVENT_READ_READY, vc) != EVENT_CONT)
return;
- }
- // ink_assert(s->enabled);
// change of lock... don't look at shared variables!
if (lock.m.m_ptr != s->vio.mutex.m_ptr) {
read_reschedule(nh, vc);
@@ -363,7 +360,7 @@
// Rescheduling the UnixNetVConnection when necessary.
//
void
-write_to_net(NetHandler * nh, UnixNetVConnection * vc, PollDescriptor * pd, EThread * thread)
+write_to_net(NetHandler *nh, UnixNetVConnection *vc, PollDescriptor *pd, EThread *thread)
{
ProxyMutex *mutex = thread->mutex;
@@ -375,7 +372,7 @@
void
-write_to_net_io(NetHandler * nh, UnixNetVConnection * vc, EThread * thread)
+write_to_net_io(NetHandler *nh, UnixNetVConnection *vc, EThread *thread)
{
vc->addLogMessage("write to net io");
@@ -526,7 +523,7 @@
VIO *
-UnixNetVConnection::do_io_read(Continuation * c, ink64 nbytes, MIOBuffer * buf)
+UnixNetVConnection::do_io_read(Continuation *c, ink64 nbytes, MIOBuffer *buf)
{
ink_assert(!closed);
if (buf)
@@ -548,7 +545,7 @@
}
VIO *
-UnixNetVConnection::do_io_write(Continuation * acont, ink64 anbytes, IOBufferReader * abuffer, bool owner)
+UnixNetVConnection::do_io_write(Continuation *acont, ink64 anbytes, IOBufferReader *abuffer, bool owner)
{
addLogMessage("do_io_write");
ink_assert(!closed);
@@ -636,7 +633,7 @@
}
int
-OOB_callback::retry_OOB_send(int event, Event * e)
+OOB_callback::retry_OOB_send(int event, Event *e)
{
(void) event;
(void) e;
@@ -663,7 +660,7 @@
}
Action *
-UnixNetVConnection::send_OOB(Continuation * cont, char *buf, int len)
+UnixNetVConnection::send_OOB(Continuation *cont, char *buf, int len)
{
UnixNetVConnection *u = (UnixNetVConnection *) this;
ink_debug_assert(len > 0);
@@ -699,7 +696,7 @@
// writing.
//
void
-UnixNetVConnection::reenable(VIO * vio)
+UnixNetVConnection::reenable(VIO *vio)
{
if (STATE_FROM_VIO(vio)->enabled)
return;
@@ -711,12 +708,14 @@
if (nh->mutex->thread_holding == t) {
if (vio == &read.vio) {
ep.modify(EVENTIO_READ);
+ ep.refresh(EVENTIO_READ);
if (read.triggered)
nh->read_ready_list.in_or_enqueue(this);
else
nh->read_ready_list.remove(this);
} else {
ep.modify(EVENTIO_WRITE);
+ ep.refresh(EVENTIO_WRITE);
if (write.triggered)
nh->write_ready_list.in_or_enqueue(this);
else
@@ -739,12 +738,14 @@
} else {
if (vio == &read.vio) {
ep.modify(EVENTIO_READ);
+ ep.refresh(EVENTIO_READ);
if (read.triggered)
nh->read_ready_list.in_or_enqueue(this);
else
nh->read_ready_list.remove(this);
} else {
ep.modify(EVENTIO_WRITE);
+ ep.refresh(EVENTIO_WRITE);
if (write.triggered)
nh->write_ready_list.in_or_enqueue(this);
else
@@ -755,7 +756,7 @@
}
void
-UnixNetVConnection::reenable_re(VIO * vio)
+UnixNetVConnection::reenable_re(VIO *vio)
{
if (!thread)
return;
@@ -765,12 +766,14 @@
set_enabled(vio);
if (vio == &read.vio) {
ep.modify(EVENTIO_READ);
+ ep.refresh(EVENTIO_READ);
if (read.triggered)
net_read_io(nh, t);
else
nh->read_ready_list.remove(this);
} else {
ep.modify(EVENTIO_WRITE);
+ ep.refresh(EVENTIO_WRITE);
if (write.triggered)
write_to_net(nh, this, NULL, t);
else
@@ -799,7 +802,7 @@
// Private methods
void
-UnixNetVConnection::set_enabled(VIO * vio)
+UnixNetVConnection::set_enabled(VIO *vio)
{
ink_debug_assert(vio->mutex->thread_holding == this_ethread());
ink_assert(!closed);
@@ -814,7 +817,7 @@
}
void
-UnixNetVConnection::net_read_io(NetHandler * nh, EThread * lthread)
+UnixNetVConnection::net_read_io(NetHandler *nh, EThread *lthread)
{
read_from_net(nh, this, lthread);
}
@@ -869,19 +872,19 @@
}
void
-UnixNetVConnection::readDisable(NetHandler * nh)
+UnixNetVConnection::readDisable(NetHandler *nh)
{
read_disable(nh, this);
}
void
-UnixNetVConnection::readSignalError(NetHandler * nh, int err)
+UnixNetVConnection::readSignalError(NetHandler *nh, int err)
{
read_signal_error(nh, this, err);
}
int
-UnixNetVConnection::readSignalDone(int event, NetHandler * nh)
+UnixNetVConnection::readSignalDone(int event, NetHandler *nh)
{
return (read_signal_done(event, nh, this));
}
@@ -897,19 +900,19 @@
// without affecting regular net stuff or copying a bunch of code into
// the header files.
void
-UnixNetVConnection::readReschedule(NetHandler * nh)
+UnixNetVConnection::readReschedule(NetHandler *nh)
{
read_reschedule(nh, this);
}
void
-UnixNetVConnection::netActivity(EThread * lthread)
+UnixNetVConnection::netActivity(EThread *lthread)
{
net_activity(this, lthread);
}
int
-UnixNetVConnection::startEvent(int event, Event * e)
+UnixNetVConnection::startEvent(int event, Event *e)
{
(void) event;
MUTEX_TRY_LOCK(lock, action_.mutex, e->ethread);
@@ -927,7 +930,7 @@
}
int
-UnixNetVConnection::acceptEvent(int event, Event * e)
+UnixNetVConnection::acceptEvent(int event, Event *e)
{
(void) event;
thread = e->ethread;
@@ -974,7 +977,7 @@
// and for active and inactivity timeouts.
//
int
-UnixNetVConnection::mainEvent(int event, Event * e)
+UnixNetVConnection::mainEvent(int event, Event *e)
{
addLogMessage("main event");
ink_debug_assert(event == EVENT_IMMEDIATE || event == EVENT_INTERVAL);
@@ -1051,7 +1054,7 @@
int
-UnixNetVConnection::connectUp(EThread * t)
+UnixNetVConnection::connectUp(EThread *t)
{
addLogMessage("connectUp");
@@ -1134,7 +1137,7 @@
void
-UnixNetVConnection::free(EThread * t)
+UnixNetVConnection::free(EThread *t)
{
NET_DECREMENT_THREAD_DYN_STAT(net_connections_currently_open_stat, t);
// clear variables for reuse