You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by jp...@apache.org on 2010/01/12 20:30:28 UTC

svn commit: r898479 - in /incubator/trafficserver/traffic/branches/dev/iocore/net: P_UnixNet.h P_UnixPollDescriptor.h UnixNetVConnection.cc

Author: jplevyak
Date: Tue Jan 12 19:30:27 2010
New Revision: 898479

URL: http://svn.apache.org/viewvc?rev=898479&view=rev
Log:
TS-50, TS-54: this patch fully supports FreeBSD/KQUEUE without libev.
It also makes edge triggering and level triggering of events fully orthogonal
for USE_EPOLL and USE_KQUEUE so that the differences can be examined.
libev is now optional on FreeBSD and likely could be made optional on OSX.
Currently the default remains edge triggering.

Modified:
    incubator/trafficserver/traffic/branches/dev/iocore/net/P_UnixNet.h
    incubator/trafficserver/traffic/branches/dev/iocore/net/P_UnixPollDescriptor.h
    incubator/trafficserver/traffic/branches/dev/iocore/net/UnixNetVConnection.cc

Modified: incubator/trafficserver/traffic/branches/dev/iocore/net/P_UnixNet.h
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/branches/dev/iocore/net/P_UnixNet.h?rev=898479&r1=898478&r2=898479&view=diff
==============================================================================
--- incubator/trafficserver/traffic/branches/dev/iocore/net/P_UnixNet.h (original)
+++ incubator/trafficserver/traffic/branches/dev/iocore/net/P_UnixNet.h Tue Jan 12 19:30:27 2010
@@ -28,6 +28,7 @@
 #include "List.h"
 
 #define USE_EDGE_TRIGGER_EPOLL  1
+#define USE_EDGE_TRIGGER_KQUEUE  1
 
 #ifdef USE_LIBEV
 #define EV_MINPRI 0
@@ -68,6 +69,9 @@
 #else
   int fd;
 #endif
+#if defined(USE_KQUEUE) || (defined(USE_EPOLL) && !defined(USE_EDGE_TRIGGERED_EPOLL))
+  int events;
+#endif
   EventLoop event_loop;
   int type;
   union
@@ -83,11 +87,11 @@
   int start(EventLoop l, UnixNetVConnection *vc, int events);
   int start(EventLoop l, UnixUDPConnection *vc, int events);
   int start(EventLoop l, int fd, Continuation *c, int events);
-  /*
-    Change the existing events by adding modify(EVENTIO_READ)
-    or removing modify(-EVENTIO_READ)
-  */
+  // Change the existing events by adding modify(EVENTIO_READ)
+  // or removing modify(-EVENTIO_READ), for level triggered I/O
   int modify(int events);
+  // Refresh the existing events (i.e. KQUEUE EV_CLEAR), for edge triggered I/O
+  int refresh(int events);
   int stop();
   int close();
   EventIO() { 
@@ -126,8 +130,14 @@
 #endif
 #define EVENTIO_ERROR (EPOLLERR|EPOLLPRI|EPOLLHUP)
 #elif defined(USE_KQUEUE)
-#define EVENTIO_READ EVFILT_READ
-#define EVENTIO_READ EVFILT_WRITE
+#ifdef USE_EDGE_TRIGGER_KQUEUE
+#define USE_EDGE_TRIGGER 1
+#define INK_EV_EDGE_TRIGGER EV_CLEAR
+#else
+#define INK_EV_EDGE_TRIGGER 0
+#endif
+#define EVENTIO_READ INK_EVP_IN
+#define EVENTIO_WRITE INK_EVP_OUT
 #define EVENTIO_ERROR (0x010|0x002|0x020) // ERR PRI HUP
 #else
 #error port me
@@ -516,6 +526,10 @@
   return 0;
 }
 
+inline int EventIO::refresh(int e) {
+  return 0;
+}
+
 inline int EventIO::stop() {
   if (event_loop) {
     ev_io_stop(event_loop->eio, &eio);
@@ -535,21 +549,83 @@
   memset(&ev, 0, sizeof(ev));
   ev.events = e;
   ev.data.ptr = this;
+#ifndef USE_EDGE_TRIGGERED
+  events = e;
+#endif
   return epoll_ctl(event_loop->epoll_fd, EPOLL_CTL_ADD, fd, &ev);
 #elif defined(USE_KQUEUE)
-  struct kevent ev;
-  EV_SET(&ev, con[icon].fd, EVFILT_READ, EV_ADD, 0, 0, con[icon].epoll_ptr);
-  return kevent(pd->kqueue_fd, &ev, 1, NULL, 0, NULL);
+  events = e;
+  struct kevent ev[2];
+  int n = 0;
+  if (e & EVENTIO_READ)
+    EV_SET(&ev[n++], fd, EVFILT_READ, EV_ADD|INK_EV_EDGE_TRIGGER, 0, 0, this);
+  if (e & EVENTIO_WRITE)
+    EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_ADD|INK_EV_EDGE_TRIGGER, 0, 0, this);
+  return kevent(l->kqueue_fd, &ev[0], n, NULL, 0, NULL);
 #else
 #error port me
 #endif
 }
 
 inline int EventIO::modify(int e) {
-  (void)e;
+#if defined(USE_EPOLL) && !defined(USE_EDGE_TRIGGER)
+  struct epoll_event ev;
+  memset(&ev, 0, sizeof(ev));
+  int ee = events;
+  if (e < 0)
+    ee &= ~(-e);
+  else
+    ee |= e;
+  events = ee;
+  ev.events = ee;
+  ev.data.ptr = this;
+  return epoll_ctl(event_loop->epoll_fd, EPOLL_CTL_MOD, fd, &ev);
+#elif defined(USE_KQUEUE) && !defined(USE_EDGE_TRIGGER)
+  int n = 0;
+  struct kevent ev[2];
+  int ee = events;
+  if (e < 0) {
+    ee &= ~(-e);
+    if ((-e) & EVENTIO_READ)
+      EV_SET(&ev[n++], fd, EVFILT_READ, EV_DELETE, 0, 0, this);
+    if ((-e) & EVENTIO_WRITE)
+      EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, this);
+  } else {
+    ee |= e;
+    if (e & EVENTIO_READ)
+      EV_SET(&ev[n++], fd, EVFILT_READ, EV_ADD|INK_EV_EDGE_TRIGGER, 0, 0, this);
+    if (e & EVENTIO_WRITE)
+      EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_ADD|INK_EV_EDGE_TRIGGER, 0, 0, this);
+  }
+  events = ee;
+  if (n)
+    return kevent(event_loop->kqueue_fd, &ev[0], n, NULL, 0, NULL);
+  else
+    return 0;
+#else
   return 0;
+#endif
 }
 
+inline int EventIO::refresh(int e) {
+#if defined(USE_KQUEUE) && defined(USE_EDGE_TRIGGER)
+  e = e & events;
+  struct kevent ev[2];
+  int n = 0;
+  if (e & EVENTIO_READ)
+    EV_SET(&ev[n++], fd, EVFILT_READ, EV_ADD|INK_EV_EDGE_TRIGGER, 0, 0, this);
+  if (e & EVENTIO_WRITE)
+    EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_ADD|INK_EV_EDGE_TRIGGER, 0, 0, this);
+  if (n)
+    return kevent(event_loop->kqueue_fd, &ev[0], n, NULL, 0, NULL);
+  else
+    return 0;
+#else
+  return 0;
+#endif
+}
+
+
 inline int EventIO::stop() {
   if (event_loop) {
 #if defined(USE_EPOLL)
@@ -558,10 +634,17 @@
     ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
     return epoll_ctl(event_loop->epoll_fd, EPOLL_CTL_DEL, fd, &ev);
 #elif defined(USE_KQUEUE)
+#if 0
+    // this is not necessary and may result in a race if
+    // a file descriptor is reused between polls
+    int n = 0;
     struct kevent ev[2];
-    EV_SET(&ev[0], con[icon].fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
-    EV_SET(&ev[1], con[icon].fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
-    return kevent(pd->kqueue_fd, &ev[0], 2, NULL, 0, NULL);
+    if (events & EVENTIO_READ)
+      EV_SET(&ev[n++], fd, EVFILT_READ, EV_DELETE, 0, 0, this);
+    if (events & EVENTIO_WRITE)
+      EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, this);
+    return kevent(event_loop->kqueue_fd, &ev[0], n, NULL, 0, NULL);
+#endif
 #else
 #error port me
 #endif

Modified: incubator/trafficserver/traffic/branches/dev/iocore/net/P_UnixPollDescriptor.h
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/branches/dev/iocore/net/P_UnixPollDescriptor.h?rev=898479&r1=898478&r2=898479&view=diff
==============================================================================
--- incubator/trafficserver/traffic/branches/dev/iocore/net/P_UnixPollDescriptor.h (original)
+++ incubator/trafficserver/traffic/branches/dev/iocore/net/P_UnixPollDescriptor.h Tue Jan 12 19:30:27 2010
@@ -33,6 +33,14 @@
 #ifdef USE_LIBEV
 #include "ev.h"
 #endif
+#if defined(USE_KQUEUE)
+#include <sys/event.h>
+#define INK_EVP_IN    0x001
+#define INK_EVP_PRI   0x002
+#define INK_EVP_OUT   0x004
+#define INK_EVP_ERR   0x010
+#define INK_EVP_HUP   0x020
+#endif
 
 #define POLL_DESCRIPTOR_SIZE 32768
 

Modified: incubator/trafficserver/traffic/branches/dev/iocore/net/UnixNetVConnection.cc
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/branches/dev/iocore/net/UnixNetVConnection.cc?rev=898479&r1=898478&r2=898479&view=diff
==============================================================================
--- incubator/trafficserver/traffic/branches/dev/iocore/net/UnixNetVConnection.cc (original)
+++ incubator/trafficserver/traffic/branches/dev/iocore/net/UnixNetVConnection.cc Tue Jan 12 19:30:27 2010
@@ -44,15 +44,16 @@
 //
 // Prototypes
 //
-void net_update_priority(NetHandler * nh, UnixNetVConnection * vc, NetState * ns, int ndone);
+void net_update_priority(NetHandler *nh, UnixNetVConnection *vc, NetState *ns, int ndone);
 
 //
-// Reschedule a UnixNetVConnection by moving VC 
+// Reschedule a UnixNetVConnection by moving it
 // onto or off of the ready_list
 //
 static inline void
-read_reschedule(NetHandler * nh, UnixNetVConnection * vc)
+read_reschedule(NetHandler *nh, UnixNetVConnection *vc)
 {
+  vc->ep.refresh(EVENTIO_READ);
   if (vc->read.triggered && vc->read.enabled) {
     nh->read_ready_list.in_or_enqueue(vc);
   } else
@@ -60,8 +61,9 @@
 }
 
 static inline void
-write_reschedule(NetHandler * nh, UnixNetVConnection * vc)
+write_reschedule(NetHandler *nh, UnixNetVConnection *vc)
 {
+  vc->ep.refresh(EVENTIO_WRITE);
   if (vc->write.triggered && vc->write.enabled) {
     nh->write_ready_list.in_or_enqueue(vc);
   } else
@@ -69,7 +71,7 @@
 }
 
 void
-net_activity(UnixNetVConnection * vc, EThread * thread)
+net_activity(UnixNetVConnection *vc, EThread *thread)
 {
   (void) thread;
 #ifdef INACTIVITY_TIMEOUT
@@ -85,9 +87,8 @@
   }
 #else
   vc->next_inactivity_timeout_at = 0;
-  if (vc->inactivity_timeout_in) {
+  if (vc->inactivity_timeout_in)
     vc->next_inactivity_timeout_at = ink_get_hrtime() + vc->inactivity_timeout_in;
-  }
 #endif
 
 }
@@ -96,7 +97,7 @@
 // Function used to close a UnixNetVConnection and free the vc
 //
 void
-close_UnixNetVConnection(UnixNetVConnection * vc, EThread * t)
+close_UnixNetVConnection(UnixNetVConnection *vc, EThread *t)
 {
   if (vc->loggingEnabled()) {
     vc->addLogMessage("close_UnixNetVConnection");
@@ -142,7 +143,7 @@
 // Signal an event
 //
 static inline int
-read_signal_and_update(int event, UnixNetVConnection * vc)
+read_signal_and_update(int event, UnixNetVConnection *vc)
 {
   vc->recursion++;
   vc->read.vio._cont->handleEvent(event, &vc->read.vio);
@@ -157,7 +158,7 @@
 }
 
 static inline int
-write_signal_and_update(int event, UnixNetVConnection * vc)
+write_signal_and_update(int event, UnixNetVConnection *vc)
 {
   vc->recursion++;
   vc->write.vio._cont->handleEvent(event, &vc->write.vio);
@@ -172,7 +173,7 @@
 }
 
 static inline int
-read_signal_done(int event, NetHandler * nh, UnixNetVConnection * vc)
+read_signal_done(int event, NetHandler *nh, UnixNetVConnection *vc)
 {
   vc->read.enabled = 0;
   if (read_signal_and_update(event, vc) == EVENT_DONE) {
@@ -184,7 +185,7 @@
 }
 
 static inline int
-write_signal_done(int event, NetHandler * nh, UnixNetVConnection * vc)
+write_signal_done(int event, NetHandler *nh, UnixNetVConnection *vc)
 {
   vc->write.enabled = 0;
   if (write_signal_and_update(event, vc) == EVENT_DONE) {
@@ -196,14 +197,14 @@
 }
 
 static inline int
-read_signal_error(NetHandler * nh, UnixNetVConnection * vc, int lerrno)
+read_signal_error(NetHandler *nh, UnixNetVConnection *vc, int lerrno)
 {
   vc->lerrno = lerrno;
   return read_signal_done(VC_EVENT_ERROR, nh, vc);
 }
 
 static inline int
-write_signal_error(NetHandler * nh, UnixNetVConnection * vc, int lerrno)
+write_signal_error(NetHandler *nh, UnixNetVConnection *vc, int lerrno)
 {
   vc->lerrno = lerrno;
   return write_signal_done(VC_EVENT_ERROR, nh, vc);
@@ -214,11 +215,11 @@
 // onto or off of the ready_list.
 // Had to wrap this function with net_read_io for SSL.
 static void
-read_from_net(NetHandler * nh, UnixNetVConnection * vc, EThread * thread)
+read_from_net(NetHandler *nh, UnixNetVConnection *vc, EThread *thread)
 {
   vc->addLogMessage("read from net");
-  NetState * s = &vc->read;
-  ProxyMutex * mutex = thread->mutex;
+  NetState *s = &vc->read;
+  ProxyMutex *mutex = thread->mutex;
   MIOBufferAccessor & buf = s->vio.buffer;
   int r = 0;
 
@@ -252,7 +253,7 @@
   int niov = 0;
   IOVec tiovec[NET_MAX_IOV];
   if (toread) {
-    IOBufferBlock * b = buf.mbuf->_writer;
+    IOBufferBlock *b = buf.mbuf->_writer;
     do {
       niov = 0;
       rattempted = 0;
@@ -295,7 +296,6 @@
     // check for errors
     if (r <= 0) {
 
-      // If the socket was not ready,add into the WaitList
       if (r == -EAGAIN || r == -ENOTCONN) {
         NET_DEBUG_COUNT_DYN_STAT(net_calls_to_read_nodata_stat, 1);
         vc->addLogMessage("EAGAIN or ENOTCONN");
@@ -324,9 +324,8 @@
 #endif
     s->vio.ndone += r;
     net_activity(vc, thread);
-  } else {
+  } else
     r = 0;
-  }
 
   // Signal read ready, check if user is not done
   if (r) {
@@ -337,10 +336,8 @@
       Debug("ssl", "read_from_net, read finished - signal done");
       return;
     } else {
-      if (read_signal_and_update(VC_EVENT_READ_READY, vc) != EVENT_CONT) {
+      if (read_signal_and_update(VC_EVENT_READ_READY, vc) != EVENT_CONT)
         return;
-      }
-      // ink_assert(s->enabled);
       // change of lock... don't look at shared variables!
       if (lock.m.m_ptr != s->vio.mutex.m_ptr) {
         read_reschedule(nh, vc);
@@ -363,7 +360,7 @@
 // Rescheduling the UnixNetVConnection when necessary.
 //
 void
-write_to_net(NetHandler * nh, UnixNetVConnection * vc, PollDescriptor * pd, EThread * thread)
+write_to_net(NetHandler *nh, UnixNetVConnection *vc, PollDescriptor *pd, EThread *thread)
 {
   ProxyMutex *mutex = thread->mutex;
 
@@ -375,7 +372,7 @@
 
 
 void
-write_to_net_io(NetHandler * nh, UnixNetVConnection * vc, EThread * thread)
+write_to_net_io(NetHandler *nh, UnixNetVConnection *vc, EThread *thread)
 {
   vc->addLogMessage("write to net io");
 
@@ -526,7 +523,7 @@
 
 
 VIO *
-UnixNetVConnection::do_io_read(Continuation * c, ink64 nbytes, MIOBuffer * buf)
+UnixNetVConnection::do_io_read(Continuation *c, ink64 nbytes, MIOBuffer *buf)
 {
   ink_assert(!closed);
   if (buf)
@@ -548,7 +545,7 @@
 }
 
 VIO *
-UnixNetVConnection::do_io_write(Continuation * acont, ink64 anbytes, IOBufferReader * abuffer, bool owner)
+UnixNetVConnection::do_io_write(Continuation *acont, ink64 anbytes, IOBufferReader *abuffer, bool owner)
 {
   addLogMessage("do_io_write");
   ink_assert(!closed);
@@ -636,7 +633,7 @@
 }
 
 int
-OOB_callback::retry_OOB_send(int event, Event * e)
+OOB_callback::retry_OOB_send(int event, Event *e)
 {
   (void) event;
   (void) e;
@@ -663,7 +660,7 @@
 }
 
 Action *
-UnixNetVConnection::send_OOB(Continuation * cont, char *buf, int len)
+UnixNetVConnection::send_OOB(Continuation *cont, char *buf, int len)
 {
   UnixNetVConnection *u = (UnixNetVConnection *) this;
   ink_debug_assert(len > 0);
@@ -699,7 +696,7 @@
 // writing.
 //
 void
-UnixNetVConnection::reenable(VIO * vio)
+UnixNetVConnection::reenable(VIO *vio)
 {
   if (STATE_FROM_VIO(vio)->enabled)
     return;
@@ -711,12 +708,14 @@
   if (nh->mutex->thread_holding == t) {
     if (vio == &read.vio) {
       ep.modify(EVENTIO_READ);
+      ep.refresh(EVENTIO_READ);
       if (read.triggered)
         nh->read_ready_list.in_or_enqueue(this);
       else
         nh->read_ready_list.remove(this);
     } else {
       ep.modify(EVENTIO_WRITE);
+      ep.refresh(EVENTIO_WRITE);
       if (write.triggered)
         nh->write_ready_list.in_or_enqueue(this);
       else
@@ -739,12 +738,14 @@
     } else {
       if (vio == &read.vio) {
         ep.modify(EVENTIO_READ);
+	ep.refresh(EVENTIO_READ);
         if (read.triggered)
           nh->read_ready_list.in_or_enqueue(this);
         else
           nh->read_ready_list.remove(this);
       } else {
         ep.modify(EVENTIO_WRITE);
+	ep.refresh(EVENTIO_WRITE);
         if (write.triggered)
           nh->write_ready_list.in_or_enqueue(this);
         else
@@ -755,7 +756,7 @@
 }
 
 void
-UnixNetVConnection::reenable_re(VIO * vio)
+UnixNetVConnection::reenable_re(VIO *vio)
 {
   if (!thread)
     return;
@@ -765,12 +766,14 @@
     set_enabled(vio);
     if (vio == &read.vio) {
       ep.modify(EVENTIO_READ);
+      ep.refresh(EVENTIO_READ);
       if (read.triggered)
         net_read_io(nh, t);
       else
         nh->read_ready_list.remove(this);
     } else {
       ep.modify(EVENTIO_WRITE);
+      ep.refresh(EVENTIO_WRITE);
       if (write.triggered)
         write_to_net(nh, this, NULL, t);
       else
@@ -799,7 +802,7 @@
 // Private methods
 
 void
-UnixNetVConnection::set_enabled(VIO * vio)
+UnixNetVConnection::set_enabled(VIO *vio)
 {
   ink_debug_assert(vio->mutex->thread_holding == this_ethread());
   ink_assert(!closed);
@@ -814,7 +817,7 @@
 }
 
 void
-UnixNetVConnection::net_read_io(NetHandler * nh, EThread * lthread)
+UnixNetVConnection::net_read_io(NetHandler *nh, EThread *lthread)
 {
   read_from_net(nh, this, lthread);
 }
@@ -869,19 +872,19 @@
 }
 
 void
-UnixNetVConnection::readDisable(NetHandler * nh)
+UnixNetVConnection::readDisable(NetHandler *nh)
 {
   read_disable(nh, this);
 }
 
 void
-UnixNetVConnection::readSignalError(NetHandler * nh, int err)
+UnixNetVConnection::readSignalError(NetHandler *nh, int err)
 {
   read_signal_error(nh, this, err);
 }
 
 int
-UnixNetVConnection::readSignalDone(int event, NetHandler * nh)
+UnixNetVConnection::readSignalDone(int event, NetHandler *nh)
 {
   return (read_signal_done(event, nh, this));
 }
@@ -897,19 +900,19 @@
 // without affecting regular net stuff or copying a bunch of code into
 // the header files.
 void
-UnixNetVConnection::readReschedule(NetHandler * nh)
+UnixNetVConnection::readReschedule(NetHandler *nh)
 {
   read_reschedule(nh, this);
 }
 
 void
-UnixNetVConnection::netActivity(EThread * lthread)
+UnixNetVConnection::netActivity(EThread *lthread)
 {
   net_activity(this, lthread);
 }
 
 int
-UnixNetVConnection::startEvent(int event, Event * e)
+UnixNetVConnection::startEvent(int event, Event *e)
 {
   (void) event;
   MUTEX_TRY_LOCK(lock, action_.mutex, e->ethread);
@@ -927,7 +930,7 @@
 }
 
 int
-UnixNetVConnection::acceptEvent(int event, Event * e)
+UnixNetVConnection::acceptEvent(int event, Event *e)
 {
   (void) event;
   thread = e->ethread;
@@ -974,7 +977,7 @@
 // and for active and inactivity timeouts.
 //
 int
-UnixNetVConnection::mainEvent(int event, Event * e)
+UnixNetVConnection::mainEvent(int event, Event *e)
 {
   addLogMessage("main event");
   ink_debug_assert(event == EVENT_IMMEDIATE || event == EVENT_INTERVAL);
@@ -1051,7 +1054,7 @@
 
 
 int
-UnixNetVConnection::connectUp(EThread * t)
+UnixNetVConnection::connectUp(EThread *t)
 {
   addLogMessage("connectUp");
 
@@ -1134,7 +1137,7 @@
 
 
 void
-UnixNetVConnection::free(EThread * t)
+UnixNetVConnection::free(EThread *t)
 {
   NET_DECREMENT_THREAD_DYN_STAT(net_connections_currently_open_stat, t);
   // clear variables for reuse