You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by jp...@apache.org on 2010/05/20 18:49:10 UTC
svn commit: r946706 - in /trafficserver/traffic/trunk/iocore: aio/ cache/
eventsystem/ net/
Author: jplevyak
Date: Thu May 20 16:49:09 2010
New Revision: 946706
URL: http://svn.apache.org/viewvc?rev=946706&view=rev
Log:
TS-363: added the fast_signal option to schedules on a particular thread.
Used fast_signal option in AIO and cache on io completion to wake net thread.
Modified:
trafficserver/traffic/trunk/iocore/aio/AIO.cc
trafficserver/traffic/trunk/iocore/cache/CacheWrite.cc
trafficserver/traffic/trunk/iocore/eventsystem/I_EThread.h
trafficserver/traffic/trunk/iocore/eventsystem/I_ProtectedQueue.h
trafficserver/traffic/trunk/iocore/eventsystem/P_ProtectedQueue.h
trafficserver/traffic/trunk/iocore/eventsystem/P_UnixEThread.h
trafficserver/traffic/trunk/iocore/eventsystem/P_UnixEventProcessor.h
trafficserver/traffic/trunk/iocore/eventsystem/ProtectedQueue.cc
trafficserver/traffic/trunk/iocore/eventsystem/UnixEThread.cc
trafficserver/traffic/trunk/iocore/net/P_UnixNet.h
trafficserver/traffic/trunk/iocore/net/UnixNet.cc
Modified: trafficserver/traffic/trunk/iocore/aio/AIO.cc
URL: http://svn.apache.org/viewvc/trafficserver/traffic/trunk/iocore/aio/AIO.cc?rev=946706&r1=946705&r2=946706&view=diff
==============================================================================
--- trafficserver/traffic/trunk/iocore/aio/AIO.cc (original)
+++ trafficserver/traffic/trunk/iocore/aio/AIO.cc Thu May 20 16:49:09 2010
@@ -490,9 +490,9 @@ aio_thread_main(void *arg)
if (!op->action.cancelled)
op->action.continuation->handleEvent(AIO_EVENT_DONE, op);
} else if (op->thread == AIO_CALLBACK_THREAD_ANY)
- eventProcessor.schedule_imm(op);
+ eventProcessor.schedule_imm_signal(op);
else
- op->thread->schedule_imm(op);
+ op->thread->schedule_imm_signal(op);
ink_mutex_acquire(&my_aio_req->aio_mutex);
} while (1);
ink_cond_wait(&my_aio_req->aio_cond, &my_aio_req->aio_mutex);
Modified: trafficserver/traffic/trunk/iocore/cache/CacheWrite.cc
URL: http://svn.apache.org/viewvc/trafficserver/traffic/trunk/iocore/cache/CacheWrite.cc?rev=946706&r1=946705&r2=946706&view=diff
==============================================================================
--- trafficserver/traffic/trunk/iocore/cache/CacheWrite.cc (original)
+++ trafficserver/traffic/trunk/iocore/cache/CacheWrite.cc Thu May 20 16:49:09 2010
@@ -359,7 +359,7 @@ Part::aggWriteDone(int event, Event *e)
CacheVC *c = 0;
while ((c = sync.dequeue())) {
if (UINT_WRAP_LTE(c->write_serial + 2, header->write_serial))
- c->initial_thread->schedule_imm(c, AIO_EVENT_DONE);
+ c->initial_thread->schedule_imm_signal(c, AIO_EVENT_DONE);
else {
sync.push(c); // put it back on the front
break;
@@ -1017,7 +1017,7 @@ Lagain:
ink_assert(false);
while ((c = agg.dequeue())) {
agg_todo_size -= c->agg_len;
- c->initial_thread->schedule_imm(c, AIO_EVENT_DONE);
+ c->initial_thread->schedule_imm_signal(c, AIO_EVENT_DONE);
}
return EVENT_CONT;
}
@@ -1077,7 +1077,7 @@ Lwait:
if (event == EVENT_CALL && c->mutex->thread_holding == mutex->thread_holding)
ret = EVENT_RETURN;
else
- c->initial_thread->schedule_imm(c, AIO_EVENT_DONE);
+ c->initial_thread->schedule_imm_signal(c, AIO_EVENT_DONE);
}
return ret;
}
Modified: trafficserver/traffic/trunk/iocore/eventsystem/I_EThread.h
URL: http://svn.apache.org/viewvc/trafficserver/traffic/trunk/iocore/eventsystem/I_EThread.h?rev=946706&r1=946705&r2=946706&view=diff
==============================================================================
--- trafficserver/traffic/trunk/iocore/eventsystem/I_EThread.h (original)
+++ trafficserver/traffic/trunk/iocore/eventsystem/I_EThread.h Thu May 20 16:49:09 2010
@@ -112,6 +112,7 @@ public:
*/
Event * schedule_imm(Continuation * c, int callback_event = EVENT_IMMEDIATE, void *cookie = NULL);
+ Event * schedule_imm_signal(Continuation * c, int callback_event = EVENT_IMMEDIATE, void *cookie = NULL);
/**
Schedules the continuation on this EThread to receive an event
@@ -295,18 +296,18 @@ public:
EThread();
EThread(ThreadType att, int anid);
- EThread(ThreadType att, Event * e, ink_sem * sem);
+ EThread(ThreadType att, Event *e, ink_sem *sem);
virtual ~ EThread();
- Event *schedule_spawn(Continuation * cont);
+ Event *schedule_spawn(Continuation *cont);
- Event *schedule(Event * e);
+ Event *schedule(Event *e, bool fast_signal = false);
/** Block of memory to allocate thread specific data e.g. stat system arrays. */
char thread_private[PER_THREAD_DATA];
// private data for UDP net processor
- //UDPNetHandler * udpNetHandler;
+ //UDPNetHandler *udpNetHandler;
/** Private Data for the Disk Processor. */
DiskHandler *diskHandler;
Modified: trafficserver/traffic/trunk/iocore/eventsystem/I_ProtectedQueue.h
URL: http://svn.apache.org/viewvc/trafficserver/traffic/trunk/iocore/eventsystem/I_ProtectedQueue.h?rev=946706&r1=946705&r2=946706&view=diff
==============================================================================
--- trafficserver/traffic/trunk/iocore/eventsystem/I_ProtectedQueue.h (original)
+++ trafficserver/traffic/trunk/iocore/eventsystem/I_ProtectedQueue.h Thu May 20 16:49:09 2010
@@ -46,25 +46,11 @@ struct ProtectedQueue
void remove(Event * e);
Event *dequeue_local();
void dequeue_timed(ink_hrtime cur_time, ink_hrtime timeout, bool sleep);
-#if defined(USE_OLD_EVENTFD)
- void setWriteFd(int );
- void setReadFd(int );
- int getReadFd();
-#endif
- void setWriteFd(int );
- void setReadFd(int );
- int getReadFd();
InkAtomicList al;
ink_mutex lock;
ink_cond might_have_data;
Que(Event, link) localQueue;
-#if defined(USE_OLD_EVENTFD)
- int write_pipe_fd;
- int read_pipe_fd;
-#endif
- int write_pipe_fd;
- int read_pipe_fd;
ProtectedQueue();
};
Modified: trafficserver/traffic/trunk/iocore/eventsystem/P_ProtectedQueue.h
URL: http://svn.apache.org/viewvc/trafficserver/traffic/trunk/iocore/eventsystem/P_ProtectedQueue.h?rev=946706&r1=946705&r2=946706&view=diff
==============================================================================
--- trafficserver/traffic/trunk/iocore/eventsystem/P_ProtectedQueue.h (original)
+++ trafficserver/traffic/trunk/iocore/eventsystem/P_ProtectedQueue.h Thu May 20 16:49:09 2010
@@ -34,10 +34,7 @@
TS_INLINE
-ProtectedQueue::ProtectedQueue():write_pipe_fd(-1),read_pipe_fd(-1)
-#if defined(USE_OLD_EVENTFD)
-:write_pipe_fd(-1),read_pipe_fd(-1)
-#endif
+ProtectedQueue::ProtectedQueue()
{
Event e;
ink_mutex_init(&lock, "ProtectedQueue");
@@ -49,48 +46,22 @@ TS_INLINE void
ProtectedQueue::signal()
{
// Need to get the lock before you can signal the thread
-#if defined(USE_OLD_EVENTFD)
- if(write_pipe_fd!=-1) {
- int retVal = socketManager.write(write_pipe_fd,(void*)"W",1);
- if(retVal <= 0) {
- int fd = write_pipe_fd;
- socketManager.close(fd);
- }
- } else {
-#endif
- ink_mutex_acquire(&lock);
- ink_cond_signal(&might_have_data);
- ink_mutex_release(&lock);
-#if defined(USE_OLD_EVENTFD)
- }
-#endif
+ ink_mutex_acquire(&lock);
+ ink_cond_signal(&might_have_data);
+ ink_mutex_release(&lock);
}
TS_INLINE int
ProtectedQueue::try_signal()
{
// Need to get the lock before you can signal the thread
-#if defined(USE_OLD_EVENTFD)
- if(write_pipe_fd!=-1) {
- int retVal = socketManager.write(write_pipe_fd,(void*)"W",1);
- if(retVal <= 0) {
- int fd = write_pipe_fd;
- write_pipe_fd = read_pipe_fd = -1;
- socketManager.close(fd);
- }
+ if (ink_mutex_try_acquire(&lock)) {
+ ink_cond_signal(&might_have_data);
+ ink_mutex_release(&lock);
return 1;
} else {
-#endif
- if (ink_mutex_try_acquire(&lock)) {
- ink_cond_signal(&might_have_data);
- ink_mutex_release(&lock);
- return 1;
- } else {
- return 0;
- }
-#if defined(USE_OLD_EVENTFD)
+ return 0;
}
-#endif
}
// Called from the same thread (don't need to signal)
@@ -122,28 +93,4 @@ ProtectedQueue::dequeue_local()
return e;
}
-#if defined(USE_OLD_EVENTFD)
-TS_INLINE void
-ProtectedQueue::setReadFd(int fd)
-{
- read_pipe_fd = fd;
-}
-
-TS_INLINE void
-ProtectedQueue::setWriteFd(int fd)
-{
- write_pipe_fd = fd;
-}
-
-TS_INLINE int
-ProtectedQueue::getReadFd()
-{
- int pfd[2] = {-1,-1};
- ink_create_pipe(pfd);
- setReadFd(pfd[0]);
- setWriteFd(pfd[1]);
- return pfd[0];
-}
-#endif /* USE_OLD_EVENTFD */
-
#endif
Modified: trafficserver/traffic/trunk/iocore/eventsystem/P_UnixEThread.h
URL: http://svn.apache.org/viewvc/trafficserver/traffic/trunk/iocore/eventsystem/P_UnixEThread.h?rev=946706&r1=946705&r2=946706&view=diff
==============================================================================
--- trafficserver/traffic/trunk/iocore/eventsystem/P_UnixEThread.h (original)
+++ trafficserver/traffic/trunk/iocore/eventsystem/P_UnixEThread.h Thu May 20 16:49:09 2010
@@ -54,6 +54,15 @@ EThread::schedule_imm(Continuation * con
}
TS_INLINE Event *
+EThread::schedule_imm_signal(Continuation * cont, int callback_event, void *cookie)
+{
+ Event *e =::eventAllocator.alloc();
+ e->callback_event = callback_event;
+ e->cookie = cookie;
+ return schedule(e->init(cont, 0, 0), true);
+}
+
+TS_INLINE Event *
EThread::schedule_at(Continuation * cont, ink_hrtime t, int callback_event, void *cookie)
{
Event *e =::eventAllocator.alloc();
@@ -81,7 +90,7 @@ EThread::schedule_every(Continuation * c
}
TS_INLINE Event *
-EThread::schedule(Event * e)
+EThread::schedule(Event * e, bool fast_signal)
{
e->ethread = this;
ink_assert(tt == REGULAR);
@@ -90,7 +99,7 @@ EThread::schedule(Event * e)
else
e->mutex = e->continuation->mutex = e->ethread->mutex;
ink_assert(e->mutex.m_ptr);
- EventQueueExternal.enqueue(e);
+ EventQueueExternal.enqueue(e, fast_signal);
return e;
}
Modified: trafficserver/traffic/trunk/iocore/eventsystem/P_UnixEventProcessor.h
URL: http://svn.apache.org/viewvc/trafficserver/traffic/trunk/iocore/eventsystem/P_UnixEventProcessor.h?rev=946706&r1=946705&r2=946706&view=diff
==============================================================================
--- trafficserver/traffic/trunk/iocore/eventsystem/P_UnixEventProcessor.h (original)
+++ trafficserver/traffic/trunk/iocore/eventsystem/P_UnixEventProcessor.h Thu May 20 16:49:09 2010
@@ -77,7 +77,7 @@ EventProcessor::schedule(Event * e, Even
e->mutex = e->continuation->mutex;
else
e->mutex = e->continuation->mutex = e->ethread->mutex;
- e->ethread->EventQueueExternal.enqueue(e,fast_signal);
+ e->ethread->EventQueueExternal.enqueue(e, fast_signal);
return e;
}
@@ -91,7 +91,7 @@ EventProcessor::schedule_imm_signal(Cont
#endif
e->callback_event = callback_event;
e->cookie = cookie;
- return schedule(e->init(cont, 0, 0), et,true);
+ return schedule(e->init(cont, 0, 0), et, true);
}
TS_INLINE Event *
Modified: trafficserver/traffic/trunk/iocore/eventsystem/ProtectedQueue.cc
URL: http://svn.apache.org/viewvc/trafficserver/traffic/trunk/iocore/eventsystem/ProtectedQueue.cc?rev=946706&r1=946705&r2=946706&view=diff
==============================================================================
--- trafficserver/traffic/trunk/iocore/eventsystem/ProtectedQueue.cc (original)
+++ trafficserver/traffic/trunk/iocore/eventsystem/ProtectedQueue.cc Thu May 20 16:49:09 2010
@@ -40,12 +40,12 @@
// Defining EAGER_SIGNALLING disables this behavior and causes
// threads to be made runnable immediately.
//
-//#define EAGER_SIGNALLING
+// #define EAGER_SIGNALLING
extern ClassAllocator<Event> eventAllocator;
void
-ProtectedQueue::enqueue(Event * e , bool fast_signal)
+ProtectedQueue::enqueue(Event *e , bool fast_signal)
{
ink_assert(!e->in_the_prot_queue && !e->in_the_priority_queue);
EThread *e_ethread = e->ethread;
@@ -64,13 +64,11 @@ ProtectedQueue::enqueue(Event * e , bool
e_ethread->signal_hook(e_ethread);
}
} else {
-
#ifdef EAGER_SIGNALLING
// Try to signal now and avoid deferred posting.
if (e_ethread->EventQueueExternal.try_signal())
return;
#endif
-
if (fast_signal) {
if (e_ethread->signal_hook)
e_ethread->signal_hook(e_ethread);
Modified: trafficserver/traffic/trunk/iocore/eventsystem/UnixEThread.cc
URL: http://svn.apache.org/viewvc/trafficserver/traffic/trunk/iocore/eventsystem/UnixEThread.cc?rev=946706&r1=946705&r2=946706&view=diff
==============================================================================
--- trafficserver/traffic/trunk/iocore/eventsystem/UnixEThread.cc (original)
+++ trafficserver/traffic/trunk/iocore/eventsystem/UnixEThread.cc Thu May 20 16:49:09 2010
@@ -71,12 +71,10 @@ EThread::EThread(ThreadType att, int ani
if (evfd < 0) {
if (errno == EINVAL) { // flags invalid for kernel <= 2.6.26
evfd = eventfd(0,0);
- if (evfd < 0) {
+ if (evfd < 0)
Fatal("EThread::EThread: %d=eventfd(0,0),errno(%d)",evfd,errno);
- }
- } else {
+ } else
Fatal("EThread::EThread: %d=eventfd(0,O_NONBLOCK | FD_CLOEXEC),errno(%d)",evfd,errno);
- }
}
fcntl(evfd, F_SETFD, FD_CLOEXEC);
fcntl(evfd, F_SETFL, O_NONBLOCK);
Modified: trafficserver/traffic/trunk/iocore/net/P_UnixNet.h
URL: http://svn.apache.org/viewvc/trafficserver/traffic/trunk/iocore/net/P_UnixNet.h?rev=946706&r1=946705&r2=946706&view=diff
==============================================================================
--- trafficserver/traffic/trunk/iocore/net/P_UnixNet.h (original)
+++ trafficserver/traffic/trunk/iocore/net/P_UnixNet.h Thu May 20 16:49:09 2010
@@ -125,9 +125,6 @@ struct EventIO
DNSConnection *dnscon;
NetAccept *na;
UnixUDPConnection *uc;
-#if defined(USE_OLD_EVENTFD)
- int fd;
-#endif
} data;
int start(EventLoop l, DNSConnection *vc, int events);
int start(EventLoop l, NetAccept *vc, int events);
Modified: trafficserver/traffic/trunk/iocore/net/UnixNet.cc
URL: http://svn.apache.org/viewvc/trafficserver/traffic/trunk/iocore/net/UnixNet.cc?rev=946706&r1=946705&r2=946706&view=diff
==============================================================================
--- trafficserver/traffic/trunk/iocore/net/UnixNet.cc (original)
+++ trafficserver/traffic/trunk/iocore/net/UnixNet.cc Thu May 20 16:49:09 2010
@@ -184,7 +184,7 @@ net_signal_hook_function(EThread *thread
uint64 counter = 1;
NOWARN_UNUSED_RETURN(write(thread->evfd, &counter, sizeof(uint64)));
#else
- char dummy;
+ char dummy = 1;
NOWARN_UNUSED_RETURN(write(thread->evpipe[1], &dummy, 1));
#endif
}
@@ -214,9 +214,6 @@ initialize_thread_for_net(EThread *threa
else
pd->eio = ev_loop_new(LIBEV_BACKEND_LIST);
#endif
-#if defined(USE_OLD_EVENTFD)
- initialize_eventfd(thread);
-#endif
thread->schedule_imm(get_NetHandler(thread));
#ifndef INACTIVITY_TIMEOUT
@@ -234,28 +231,6 @@ initialize_thread_for_net(EThread *threa
#endif
}
-#if defined(USE_OLD_EVENTFD)
-void initialize_eventfd(EThread *thread) {
-
- int fd = thread->getEventFd();
- PollDescriptor *pd = get_PollDescriptor(thread);
- struct epoll_data_ptr *eptr;
- eptr = (struct epoll_data_ptr *) xmalloc(sizeof(struct epoll_data_ptr));
- eptr->type = EVENTFD;
- eptr->data.fd = fd;
-
-
- struct epoll_event ev;
- memset(&ev, 0, sizeof(struct epoll_event));
-
- ev.events = EPOLLIN | EPOLLET;
- ev.data.ptr = eptr;
- if (epoll_ctl(pd->epoll_fd, EPOLL_CTL_ADD, fd, &ev) < 0) {
- Debug("iocore_net", "acceptEvent : Failed to add event add to list epoll list\n");
- }
-}
-#endif /* USE_OLD_EVENTFD */
-
// NetHandler method definitions
NetHandler::NetHandler():Continuation(NULL), trigger_event(0)
@@ -419,33 +394,8 @@ NetHandler::mainNetEvent(int event, Even
#endif
}
}
-#if !defined(USE_OLD_EVENTFD)
- else if (epd->type == EVENTIO_ASYNC_SIGNAL){
+ else if (epd->type == EVENTIO_ASYNC_SIGNAL)
net_signal_hook_callback(trigger_event->ethread);
- }
-#else /* USE_OLD_EVENTFD */
- else if (epd->type == EVENTFD) { // use: EVENTIO_ASYNC_SIGNAL
- char buf[1024];
- int retVal=-1;
- do {
- retVal = socketManager.read(epd->data.fd,&buf,1024);
- } while(retVal == 1024);
- if (retVal <=0) {
- socketManager.close(epd->data.fd);
- initialize_eventfd(e->ethread);
- }
- } else if (epd->type == EVENTFD) {
- char buf[1024];
- int retVal=-1;
- do {
- retVal = socketManager.read(epd->data.fd,&buf,1024);
- } while(retVal == 1024);
- if (retVal <=0) {
- socketManager.close(epd->data.fd);
- initialize_eventfd(e->ethread);
- }
- }
-#endif /* USE_OLD_EVENTFD */
ev_next_event(pd,x);
}