You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by am...@apache.org on 2017/10/19 12:27:20 UTC
[trafficserver] branch master updated: event loop changes
This is an automated email from the ASF dual-hosted git repository.
amc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git
The following commit(s) were added to refs/heads/master by this push:
new 39992bb event loop changes
39992bb is described below
commit 39992bb300dce9d4ebcbfdb76d78efd7fe4b72eb
Author: Fei Deng <du...@gmail.com>
AuthorDate: Tue Aug 29 12:54:06 2017 -0500
event loop changes
removed unnecessary defines
moved to class initializer
removed metrics collection
extracted externel queue processing code into new method
use std::min instead of just min
bug fix
remove unnecessary arg
---
iocore/eventsystem/I_EThread.h | 53 ++++++++-
iocore/eventsystem/I_ProtectedQueue.h | 2 +
iocore/eventsystem/P_UnixEThread.h | 6 +
iocore/eventsystem/ProtectedQueue.cc | 93 ++++-----------
iocore/eventsystem/UnixEThread.cc | 211 +++++++++++++++-------------------
iocore/net/P_UnixNet.h | 12 +-
iocore/net/UnixNet.cc | 77 +++++++------
iocore/net/UnixNetVConnection.cc | 4 +-
8 files changed, 226 insertions(+), 232 deletions(-)
diff --git a/iocore/eventsystem/I_EThread.h b/iocore/eventsystem/I_EThread.h
index c916c1f..1739df2 100644
--- a/iocore/eventsystem/I_EThread.h
+++ b/iocore/eventsystem/I_EThread.h
@@ -86,6 +86,27 @@ extern bool shutdown_event_system;
class EThread : public Thread
{
public:
+ /** Handler for tail of event loop.
+
+ The event loop should not spin. To avoid that a tail handler is called to block for a limited time.
+ This is a protocol class that defines the interface to the handler.
+ */
+ class LoopTailHandler
+ {
+ public:
+ /** Called at the end of the event loop to block.
+ @a timeout is the maximum length of time (in ns) to block.
+ */
+ virtual int waitForActivity(ink_hrtime timeout) = 0;
+ /** Unblock.
+
+ This is required to unblock (wake up) the block created by calling @a cb.
+ */
+ virtual void signalActivity() = 0;
+
+ virtual ~LoopTailHandler() {}
+ };
+
/*-------------------------------------------------------*\
| Common Interface |
\*-------------------------------------------------------*/
@@ -262,6 +283,9 @@ public:
*/
Event *schedule_spawn(Continuation *c, int ev = EVENT_IMMEDIATE, void *cookie = nullptr);
+ // Set the tail handler.
+ void set_tail_handler(LoopTailHandler *handler);
+
/* private */
Event *schedule_local(Event *e);
@@ -305,9 +329,11 @@ public:
// Private Interface
void execute() override;
+ void execute_regular();
+ void process_queue(Que(Event, link) * NegativeQueue);
void process_event(Event *e, int calling_code);
void free_event(Event *e);
- void (*signal_hook)(EThread *) = nullptr;
+ LoopTailHandler *tail_cb = &DEFAULT_TAIL_HANDLER;
#if HAVE_EVENTFD
int evfd = ts::NO_FD;
@@ -328,6 +354,31 @@ public:
Event *start_event = nullptr;
ServerSessionPool *server_session_pool = nullptr;
+
+ /** Default handler used until it is overridden.
+
+ This uses the cond var wait in @a ExternalQueue.
+ */
+ class DefaultTailHandler : public LoopTailHandler
+ {
+ DefaultTailHandler(ProtectedQueue &q) : _q(q) {}
+
+ int
+ waitForActivity(ink_hrtime timeout)
+ {
+ _q.wait(Thread::get_hrtime() + timeout);
+ return 0;
+ }
+ void
+ signalActivity()
+ {
+ _q.signal();
+ }
+
+ ProtectedQueue &_q;
+
+ friend class EThread;
+ } DEFAULT_TAIL_HANDLER = EventQueueExternal;
};
/**
diff --git a/iocore/eventsystem/I_ProtectedQueue.h b/iocore/eventsystem/I_ProtectedQueue.h
index 7dfba98..483c872 100644
--- a/iocore/eventsystem/I_ProtectedQueue.h
+++ b/iocore/eventsystem/I_ProtectedQueue.h
@@ -45,6 +45,8 @@ struct ProtectedQueue {
void remove(Event *e);
Event *dequeue_local();
void dequeue_timed(ink_hrtime cur_time, ink_hrtime timeout, bool sleep);
+ void dequeue_external(); // Dequeue any external events.
+ void wait(ink_hrtime timeout); // Wait for @a timeout nanoseconds on a condition variable if there are no events.
InkAtomicList al;
ink_mutex lock;
diff --git a/iocore/eventsystem/P_UnixEThread.h b/iocore/eventsystem/P_UnixEThread.h
index 0b68e5e..e0135a6 100644
--- a/iocore/eventsystem/P_UnixEThread.h
+++ b/iocore/eventsystem/P_UnixEThread.h
@@ -178,4 +178,10 @@ EThread::free_event(Event *e)
EVENT_FREE(e, eventAllocator, this);
}
+TS_INLINE void
+EThread::set_tail_handler(LoopTailHandler *handler)
+{
+ ink_atomic_swap(&tail_cb, handler);
+}
+
#endif /*_EThread_h_*/
diff --git a/iocore/eventsystem/ProtectedQueue.cc b/iocore/eventsystem/ProtectedQueue.cc
index c0e1d8e..bb9466b 100644
--- a/iocore/eventsystem/ProtectedQueue.cc
+++ b/iocore/eventsystem/ProtectedQueue.cc
@@ -56,55 +56,7 @@ ProtectedQueue::enqueue(Event *e, bool fast_signal)
// queue e->ethread in the list of threads to be signalled
// inserting_thread == 0 means it is not a regular EThread
if (inserting_thread != e_ethread) {
- if (!inserting_thread || !inserting_thread->ethreads_to_be_signalled) {
- signal();
- if (fast_signal) {
- if (e_ethread->signal_hook) {
- e_ethread->signal_hook(e_ethread);
- }
- }
- } else {
-#ifdef EAGER_SIGNALLING
- // Try to signal now and avoid deferred posting.
- if (e_ethread->EventQueueExternal.try_signal())
- return;
-#endif
- if (fast_signal) {
- if (e_ethread->signal_hook) {
- e_ethread->signal_hook(e_ethread);
- }
- }
- int &t = inserting_thread->n_ethreads_to_be_signalled;
- EThread **sig_e = inserting_thread->ethreads_to_be_signalled;
- if ((t + 1) >= eventProcessor.n_ethreads) {
- // we have run out of room
- if ((t + 1) == eventProcessor.n_ethreads) {
- // convert to direct map, put each ethread (sig_e[i]) into
- // the direct map loation: sig_e[sig_e[i]->id]
- for (int i = 0; i < t; i++) {
- EThread *cur = sig_e[i]; // put this ethread
- while (cur) {
- EThread *next = sig_e[cur->id]; // into this location
- if (next == cur) {
- break;
- }
- sig_e[cur->id] = cur;
- cur = next;
- }
- // if not overwritten
- if (sig_e[i] && sig_e[i]->id != i) {
- sig_e[i] = nullptr;
- }
- }
- t++;
- }
- // we have a direct map, insert this EThread
- sig_e[e_ethread->id] = e_ethread;
- } else {
- // insert into vector
- sig_e[t++] = e_ethread;
- }
- }
+ e_ethread->tail_cb->signalActivity();
}
}
}
@@ -119,24 +71,9 @@ flush_signals(EThread *thr)
}
int i;
-// Since the lock is only there to prevent a race in ink_cond_timedwait
-// the lock is taken only for a short time, thus it is unlikely that
-// this code has any effect.
-#ifdef EAGER_SIGNALLING
for (i = 0; i < n; i++) {
- // Try to signal as many threads as possible without blocking.
if (thr->ethreads_to_be_signalled[i]) {
- if (thr->ethreads_to_be_signalled[i]->EventQueueExternal.try_signal())
- thr->ethreads_to_be_signalled[i] = 0;
- }
- }
-#endif
- for (i = 0; i < n; i++) {
- if (thr->ethreads_to_be_signalled[i]) {
- thr->ethreads_to_be_signalled[i]->EventQueueExternal.signal();
- if (thr->ethreads_to_be_signalled[i]->signal_hook) {
- thr->ethreads_to_be_signalled[i]->signal_hook(thr->ethreads_to_be_signalled[i]);
- }
+ thr->ethreads_to_be_signalled[i]->tail_cb->signalActivity();
thr->ethreads_to_be_signalled[i] = nullptr;
}
}
@@ -147,17 +84,16 @@ void
ProtectedQueue::dequeue_timed(ink_hrtime cur_time, ink_hrtime timeout, bool sleep)
{
(void)cur_time;
- Event *e;
if (sleep) {
- ink_mutex_acquire(&lock);
- if (INK_ATOMICLIST_EMPTY(al)) {
- timespec ts = ink_hrtime_to_timespec(timeout);
- ink_cond_timedwait(&might_have_data, &lock, &ts);
- }
- ink_mutex_release(&lock);
+ this->wait(timeout);
}
+ this->dequeue_external();
+}
- e = (Event *)ink_atomiclist_popall(&al);
+void
+ProtectedQueue::dequeue_external()
+{
+ Event *e = (Event *)ink_atomiclist_popall(&al);
// invert the list, to preserve order
SLL<Event, Event::Link_link> l, t;
t.head = e;
@@ -174,3 +110,14 @@ ProtectedQueue::dequeue_timed(ink_hrtime cur_time, ink_hrtime timeout, bool slee
}
}
}
+
+void
+ProtectedQueue::wait(ink_hrtime timeout)
+{
+ ink_mutex_acquire(&lock);
+ if (INK_ATOMICLIST_EMPTY(al)) {
+ timespec ts = ink_hrtime_to_timespec(timeout);
+ ink_cond_timedwait(&might_have_data, &lock, &ts);
+ }
+ ink_mutex_release(&lock);
+}
diff --git a/iocore/eventsystem/UnixEThread.cc b/iocore/eventsystem/UnixEThread.cc
index 6544e60..82d074f 100644
--- a/iocore/eventsystem/UnixEThread.cc
+++ b/iocore/eventsystem/UnixEThread.cc
@@ -142,6 +142,97 @@ EThread::process_event(Event *e, int calling_code)
}
}
+void EThread::process_queue(Que(Event, link) * NegativeQueue)
+{
+ Event *e;
+
+ // Move events from the external thread safe queues to the local queue.
+ EventQueueExternal.dequeue_external();
+
+ // execute all the available external events that have
+ // already been dequeued
+ while ((e = EventQueueExternal.dequeue_local())) {
+ if (e->cancelled) {
+ free_event(e);
+ } else if (!e->timeout_at) { // IMMEDIATE
+ ink_assert(e->period == 0);
+ process_event(e, e->callback_event);
+ } else if (e->timeout_at > 0) { // INTERVAL
+ EventQueue.enqueue(e, cur_time);
+ } else { // NEGATIVE
+ Event *p = nullptr;
+ Event *a = NegativeQueue->head;
+ while (a && a->timeout_at > e->timeout_at) {
+ p = a;
+ a = a->link.next;
+ }
+ if (!a) {
+ NegativeQueue->enqueue(e);
+ } else {
+ NegativeQueue->insert(e, p);
+ }
+ }
+ }
+}
+
+void
+EThread::execute_regular()
+{
+ Event *e;
+ Que(Event, link) NegativeQueue;
+ ink_hrtime next_time = 0;
+
+ // give priority to immediate events
+ for (;;) {
+ if (unlikely(shutdown_event_system == true)) {
+ return;
+ }
+
+ process_queue(&NegativeQueue);
+
+ bool done_one;
+ do {
+ done_one = false;
+ // execute all the eligible internal events
+ EventQueue.check_ready(cur_time, this);
+ while ((e = EventQueue.dequeue_ready(cur_time))) {
+ ink_assert(e);
+ ink_assert(e->timeout_at > 0);
+ if (e->cancelled)
+ free_event(e);
+ else {
+ done_one = true;
+ process_event(e, e->callback_event);
+ }
+ }
+ } while (done_one);
+
+ // execute any negative (poll) events
+ if (NegativeQueue.head) {
+ process_queue(&NegativeQueue);
+
+ // execute poll events
+ while ((e = NegativeQueue.dequeue())) {
+ process_event(e, EVENT_POLL);
+ }
+ }
+
+ next_time = EventQueue.earliest_timeout();
+ ink_hrtime sleep_time = next_time - Thread::get_hrtime_updated();
+ if (sleep_time > 0) {
+ sleep_time = std::min(sleep_time, HRTIME_MSECONDS(THREAD_MAX_HEARTBEAT_MSECONDS));
+ } else {
+ sleep_time = 0;
+ }
+
+ if (n_ethreads_to_be_signalled) {
+ flush_signals(this);
+ }
+
+ tail_cb->waitForActivity(sleep_time);
+ }
+}
+
//
// void EThread::execute()
//
@@ -168,128 +259,12 @@ EThread::execute()
switch (tt) {
case REGULAR: {
- Event *e;
- Que(Event, link) NegativeQueue;
- ink_hrtime next_time = 0;
-
- // give priority to immediate events
- for (;;) {
- if (unlikely(shutdown_event_system == true)) {
- return;
- }
- // execute all the available external events that have
- // already been dequeued
- cur_time = Thread::get_hrtime_updated();
- while ((e = EventQueueExternal.dequeue_local())) {
- if (e->cancelled) {
- free_event(e);
- } else if (!e->timeout_at) { // IMMEDIATE
- ink_assert(e->period == 0);
- process_event(e, e->callback_event);
- } else if (e->timeout_at > 0) { // INTERVAL
- EventQueue.enqueue(e, cur_time);
- } else { // NEGATIVE
- Event *p = nullptr;
- Event *a = NegativeQueue.head;
- while (a && a->timeout_at > e->timeout_at) {
- p = a;
- a = a->link.next;
- }
- if (!a) {
- NegativeQueue.enqueue(e);
- } else {
- NegativeQueue.insert(e, p);
- }
- }
- }
- bool done_one;
- do {
- done_one = false;
- // execute all the eligible internal events
- EventQueue.check_ready(cur_time, this);
- while ((e = EventQueue.dequeue_ready(cur_time))) {
- ink_assert(e);
- ink_assert(e->timeout_at > 0);
- if (e->cancelled) {
- free_event(e);
- } else {
- done_one = true;
- process_event(e, e->callback_event);
- }
- }
- } while (done_one);
- // execute any negative (poll) events
- if (NegativeQueue.head) {
- if (n_ethreads_to_be_signalled) {
- flush_signals(this);
- }
- // dequeue all the external events and put them in a local
- // queue. If there are no external events available, don't
- // do a cond_timedwait.
- if (!INK_ATOMICLIST_EMPTY(EventQueueExternal.al)) {
- EventQueueExternal.dequeue_timed(cur_time, next_time, false);
- }
- while ((e = EventQueueExternal.dequeue_local())) {
- if (!e->timeout_at) {
- process_event(e, e->callback_event);
- } else {
- if (e->cancelled) {
- free_event(e);
- } else {
- // If its a negative event, it must be a result of
- // a negative event, which has been turned into a
- // timed-event (because of a missed lock), executed
- // before the poll. So, it must
- // be executed in this round (because you can't have
- // more than one poll between two executions of a
- // negative event)
- if (e->timeout_at < 0) {
- Event *p = nullptr;
- Event *a = NegativeQueue.head;
- while (a && a->timeout_at > e->timeout_at) {
- p = a;
- a = a->link.next;
- }
- if (!a) {
- NegativeQueue.enqueue(e);
- } else {
- NegativeQueue.insert(e, p);
- }
- } else {
- EventQueue.enqueue(e, cur_time);
- }
- }
- }
- }
- // execute poll events
- while ((e = NegativeQueue.dequeue())) {
- process_event(e, EVENT_POLL);
- }
- if (!INK_ATOMICLIST_EMPTY(EventQueueExternal.al)) {
- EventQueueExternal.dequeue_timed(cur_time, next_time, false);
- }
- } else { // Means there are no negative events
- next_time = EventQueue.earliest_timeout();
- ink_hrtime sleep_time = next_time - cur_time;
-
- if (sleep_time > THREAD_MAX_HEARTBEAT_MSECONDS * HRTIME_MSECOND) {
- next_time = cur_time + THREAD_MAX_HEARTBEAT_MSECONDS * HRTIME_MSECOND;
- }
- // dequeue all the external events and put them in a local
- // queue. If there are no external events available, do a
- // cond_timedwait.
- if (n_ethreads_to_be_signalled) {
- flush_signals(this);
- }
- EventQueueExternal.dequeue_timed(cur_time, next_time, true);
- }
- }
+ this->execute_regular();
+ break;
}
-
case DEDICATED: {
break;
}
-
default:
ink_assert(!"bad case value (execute)");
break;
diff --git a/iocore/net/P_UnixNet.h b/iocore/net/P_UnixNet.h
index c0f6338..aea295a 100644
--- a/iocore/net/P_UnixNet.h
+++ b/iocore/net/P_UnixNet.h
@@ -220,9 +220,12 @@ struct PollCont : public Continuation {
// A NetHandler handles the Network IO operations. It maintains
// lists of operations at multiples of it's periodicity.
//
-class NetHandler : public Continuation
+class NetHandler : public Continuation, public EThread::LoopTailHandler
{
public:
+ // @a thread and @a trigger_event are redundant - you can get the former from the latter.
+ // If we don't get rid of @a trigger_event we should remove @a thread.
+ EThread *thread;
Event *trigger_event;
QueM(UnixNetVConnection, NetState, read, ready_link) read_ready_list;
QueM(UnixNetVConnection, NetState, write, ready_link) write_ready_list;
@@ -247,7 +250,7 @@ public:
int startNetEvent(int event, Event *data);
int mainNetEvent(int event, Event *data);
- int mainNetEventExt(int event, Event *data);
+ int waitForActivity(ink_hrtime timeout);
void process_enabled_list();
void process_ready_list();
void manage_keep_alive_queue();
@@ -296,6 +299,9 @@ public:
*/
void stopCop(UnixNetVConnection *netvc);
+ // Signal the epoll_wait to terminate.
+ void signalActivity();
+
/**
Release a netvc and free it.
@@ -742,7 +748,7 @@ NetHandler::startIO(UnixNetVConnection *netvc)
ink_assert(netvc->thread == this_ethread());
int res = 0;
- PollDescriptor *pd = get_PollDescriptor(trigger_event->ethread);
+ PollDescriptor *pd = get_PollDescriptor(this->thread);
if (netvc->ep.start(pd, netvc, EVENTIO_READ | EVENTIO_WRITE) < 0) {
res = errno;
// EEXIST should be ok, though it should have been cleared before we got back here
diff --git a/iocore/net/UnixNet.cc b/iocore/net/UnixNet.cc
index 631b4b2..94ac15b 100644
--- a/iocore/net/UnixNet.cc
+++ b/iocore/net/UnixNet.cc
@@ -202,31 +202,18 @@ net_signal_hook_callback(EThread *thread)
#endif
}
-static void
-net_signal_hook_function(EThread *thread)
-{
-#if HAVE_EVENTFD
- uint64_t counter = 1;
- ATS_UNUSED_RETURN(write(thread->evfd, &counter, sizeof(uint64_t)));
-#elif TS_USE_PORT
- PollDescriptor *pd = get_PollDescriptor(thread);
- ATS_UNUSED_RETURN(port_send(pd->port_fd, 0, thread->ep));
-#else
- char dummy = 1;
- ATS_UNUSED_RETURN(write(thread->evpipe[1], &dummy, 1));
-#endif
-}
-
void
initialize_thread_for_net(EThread *thread)
{
- new ((ink_dummy_for_new *)get_NetHandler(thread)) NetHandler();
- new ((ink_dummy_for_new *)get_PollCont(thread)) PollCont(thread->mutex, get_NetHandler(thread));
- get_NetHandler(thread)->mutex = new_ProxyMutex();
- PollCont *pc = get_PollCont(thread);
- PollDescriptor *pd = pc->pollDescriptor;
+ NetHandler *nh = get_NetHandler(thread);
+
+ new ((ink_dummy_for_new *)nh) NetHandler();
+ new ((ink_dummy_for_new *)get_PollCont(thread)) PollCont(thread->mutex, nh);
+ nh->mutex = new_ProxyMutex();
+ nh->thread = thread;
- thread->schedule_imm(get_NetHandler(thread));
+ PollCont *pc = get_PollCont(thread);
+ PollDescriptor *pd = pc->pollDescriptor;
InactivityCop *inactivityCop = new InactivityCop(get_NetHandler(thread)->mutex);
int cop_freq = 1;
@@ -234,9 +221,9 @@ initialize_thread_for_net(EThread *thread)
REC_ReadConfigInteger(cop_freq, "proxy.config.net.inactivity_check_frequency");
thread->schedule_every(inactivityCop, HRTIME_SECONDS(cop_freq));
- thread->signal_hook = net_signal_hook_function;
- thread->ep = (EventIO *)ats_malloc(sizeof(EventIO));
- thread->ep->type = EVENTIO_ASYNC_SIGNAL;
+ thread->set_tail_handler(nh);
+ thread->ep = (EventIO *)ats_malloc(sizeof(EventIO));
+ thread->ep->type = EVENTIO_ASYNC_SIGNAL;
#if HAVE_EVENTFD
thread->ep->start(pd, thread->evfd, nullptr, EVENTIO_READ);
#else
@@ -311,7 +298,7 @@ update_nethandler_config(const char *name, RecDataT data_type ATS_UNUSED, RecDat
void
NetHandler::free_netvc(UnixNetVConnection *netvc)
{
- EThread *t = trigger_event->ethread;
+ EThread *t = this->thread;
ink_assert(t == this_ethread());
ink_release_assert(netvc->thread == t);
@@ -357,9 +344,7 @@ NetHandler::startNetEvent(int event, Event *e)
configure_per_thread();
(void)event;
- SET_HANDLER((NetContHandler)&NetHandler::mainNetEvent);
- e->schedule_every(-HRTIME_MSECONDS(net_event_period));
- trigger_event = e;
+
return EVENT_CONT;
}
@@ -408,7 +393,7 @@ NetHandler::process_ready_list()
if (vc->closed)
free_netvc(vc);
else if (vc->read.enabled && vc->read.triggered)
- vc->net_read_io(this, trigger_event->ethread);
+ vc->net_read_io(this, this->thread);
else if (!vc->read.enabled) {
read_ready_list.remove(vc);
#if defined(solaris)
@@ -425,7 +410,7 @@ NetHandler::process_ready_list()
if (vc->closed)
free_netvc(vc);
else if (vc->write.enabled && vc->write.triggered)
- write_to_net(this, vc, trigger_event->ethread);
+ write_to_net(this, vc, this->thread);
else if (!vc->write.enabled) {
write_ready_list.remove(vc);
#if defined(solaris)
@@ -443,7 +428,7 @@ NetHandler::process_ready_list()
if (vc->closed)
free_netvc(vc);
else if (vc->read.enabled && vc->read.triggered)
- vc->net_read_io(this, trigger_event->ethread);
+ vc->net_read_io(this, this->thread);
else if (!vc->read.enabled)
vc->ep.modify(-EVENTIO_READ);
}
@@ -452,7 +437,7 @@ NetHandler::process_ready_list()
if (vc->closed)
free_netvc(vc);
else if (vc->write.enabled && vc->write.triggered)
- write_to_net(this, vc, trigger_event->ethread);
+ write_to_net(this, vc, this->thread);
else if (!vc->write.enabled)
vc->ep.modify(-EVENTIO_WRITE);
}
@@ -469,18 +454,25 @@ NetHandler::mainNetEvent(int event, Event *e)
ink_assert(trigger_event == e && (event == EVENT_INTERVAL || event == EVENT_POLL));
(void)event;
(void)e;
+ return this->waitForActivity(-1);
+}
+
+int
+NetHandler::waitForActivity(ink_hrtime timeout)
+{
EventIO *epd = nullptr;
NET_INCREMENT_DYN_STAT(net_handler_run_stat);
+ SCOPED_MUTEX_LOCK(lock, mutex, this->thread);
process_enabled_list();
// Polling event by PollCont
- PollCont *p = get_PollCont(trigger_event->ethread);
+ PollCont *p = get_PollCont(this->thread);
p->handleEvent(EVENT_NONE, nullptr);
// Get & Process polling result
- PollDescriptor *pd = get_PollDescriptor(trigger_event->ethread);
+ PollDescriptor *pd = get_PollDescriptor(this->thread);
UnixNetVConnection *vc = nullptr;
for (int x = 0; x < pd->result; x++) {
epd = (EventIO *)get_ev_data(pd, x);
@@ -521,7 +513,7 @@ NetHandler::mainNetEvent(int event, Event *e)
#endif
}
} else if (epd->type == EVENTIO_ASYNC_SIGNAL) {
- net_signal_hook_callback(trigger_event->ethread);
+ net_signal_hook_callback(this->thread);
}
ev_next_event(pd, x);
}
@@ -533,6 +525,21 @@ NetHandler::mainNetEvent(int event, Event *e)
return EVENT_CONT;
}
+void
+NetHandler::signalActivity()
+{
+#if HAVE_EVENTFD
+ uint64_t counter = 1;
+ ATS_UNUSED_RETURN(write(thread->evfd, &counter, sizeof(uint64_t)));
+#elif TS_USE_PORT
+ PollDescriptor *pd = get_PollDescriptor(thread);
+ ATS_UNUSED_RETURN(port_send(pd->port_fd, 0, thread->ep));
+#else
+ char dummy = 1;
+ ATS_UNUSED_RETURN(write(thread->evpipe[1], &dummy, 1));
+#endif
+}
+
bool
NetHandler::manage_active_queue(bool ignore_queue_size = false)
{
diff --git a/iocore/net/UnixNetVConnection.cc b/iocore/net/UnixNetVConnection.cc
index 17e0e39..cd5036e 100644
--- a/iocore/net/UnixNetVConnection.cc
+++ b/iocore/net/UnixNetVConnection.cc
@@ -817,8 +817,8 @@ UnixNetVConnection::reenable(VIO *vio)
nh->write_enable_list.push(this);
}
}
- if (nh->trigger_event && nh->trigger_event->ethread->signal_hook) {
- nh->trigger_event->ethread->signal_hook(nh->trigger_event->ethread);
+ if (nh->trigger_event) {
+ nh->trigger_event->ethread->tail_cb->signalActivity();
}
} else {
if (vio == &read.vio) {
--
To stop receiving notification emails like this one, please contact
['"commits@trafficserver.apache.org" <co...@trafficserver.apache.org>'].