You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by am...@apache.org on 2017/10/19 12:27:20 UTC

[trafficserver] branch master updated: event loop changes

This is an automated email from the ASF dual-hosted git repository.

amc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git


The following commit(s) were added to refs/heads/master by this push:
     new 39992bb  event loop changes
39992bb is described below

commit 39992bb300dce9d4ebcbfdb76d78efd7fe4b72eb
Author: Fei Deng <du...@gmail.com>
AuthorDate: Tue Aug 29 12:54:06 2017 -0500

    event loop changes
    
    removed unnecessary defines
    
    moved to class initializer
    
    removed metrics collection
    
    extracted externel queue processing code into new method
    
    use std::min instead of just min
    
    bug fix
    
    remove unnecessary arg
---
 iocore/eventsystem/I_EThread.h        |  53 ++++++++-
 iocore/eventsystem/I_ProtectedQueue.h |   2 +
 iocore/eventsystem/P_UnixEThread.h    |   6 +
 iocore/eventsystem/ProtectedQueue.cc  |  93 ++++-----------
 iocore/eventsystem/UnixEThread.cc     | 211 +++++++++++++++-------------------
 iocore/net/P_UnixNet.h                |  12 +-
 iocore/net/UnixNet.cc                 |  77 +++++++------
 iocore/net/UnixNetVConnection.cc      |   4 +-
 8 files changed, 226 insertions(+), 232 deletions(-)

diff --git a/iocore/eventsystem/I_EThread.h b/iocore/eventsystem/I_EThread.h
index c916c1f..1739df2 100644
--- a/iocore/eventsystem/I_EThread.h
+++ b/iocore/eventsystem/I_EThread.h
@@ -86,6 +86,27 @@ extern bool shutdown_event_system;
 class EThread : public Thread
 {
 public:
+  /** Handler for tail of event loop.
+
+      The event loop should not spin. To avoid that a tail handler is called to block for a limited time.
+      This is a protocol class that defines the interface to the handler.
+  */
+  class LoopTailHandler
+  {
+  public:
+    /** Called at the end of the event loop to block.
+        @a timeout is the maximum length of time (in ns) to block.
+    */
+    virtual int waitForActivity(ink_hrtime timeout) = 0;
+    /** Unblock.
+
+        This is required to unblock (wake up) the block created by calling @a cb.
+    */
+    virtual void signalActivity() = 0;
+
+    virtual ~LoopTailHandler() {}
+  };
+
   /*-------------------------------------------------------*\
   |  Common Interface                                       |
   \*-------------------------------------------------------*/
@@ -262,6 +283,9 @@ public:
   */
   Event *schedule_spawn(Continuation *c, int ev = EVENT_IMMEDIATE, void *cookie = nullptr);
 
+  // Set the tail handler.
+  void set_tail_handler(LoopTailHandler *handler);
+
   /* private */
 
   Event *schedule_local(Event *e);
@@ -305,9 +329,11 @@ public:
   // Private Interface
 
   void execute() override;
+  void execute_regular();
+  void process_queue(Que(Event, link) * NegativeQueue);
   void process_event(Event *e, int calling_code);
   void free_event(Event *e);
-  void (*signal_hook)(EThread *) = nullptr;
+  LoopTailHandler *tail_cb = &DEFAULT_TAIL_HANDLER;
 
 #if HAVE_EVENTFD
   int evfd = ts::NO_FD;
@@ -328,6 +354,31 @@ public:
   Event *start_event = nullptr;
 
   ServerSessionPool *server_session_pool = nullptr;
+
+  /** Default handler used until it is overridden.
+
+      This uses the cond var wait in @a ExternalQueue.
+  */
+  class DefaultTailHandler : public LoopTailHandler
+  {
+    DefaultTailHandler(ProtectedQueue &q) : _q(q) {}
+
+    int
+    waitForActivity(ink_hrtime timeout)
+    {
+      _q.wait(Thread::get_hrtime() + timeout);
+      return 0;
+    }
+    void
+    signalActivity()
+    {
+      _q.signal();
+    }
+
+    ProtectedQueue &_q;
+
+    friend class EThread;
+  } DEFAULT_TAIL_HANDLER = EventQueueExternal;
 };
 
 /**
diff --git a/iocore/eventsystem/I_ProtectedQueue.h b/iocore/eventsystem/I_ProtectedQueue.h
index 7dfba98..483c872 100644
--- a/iocore/eventsystem/I_ProtectedQueue.h
+++ b/iocore/eventsystem/I_ProtectedQueue.h
@@ -45,6 +45,8 @@ struct ProtectedQueue {
   void remove(Event *e);
   Event *dequeue_local();
   void dequeue_timed(ink_hrtime cur_time, ink_hrtime timeout, bool sleep);
+  void dequeue_external();       // Dequeue any external events.
+  void wait(ink_hrtime timeout); // Wait for @a timeout nanoseconds on a condition variable if there are no events.
 
   InkAtomicList al;
   ink_mutex lock;
diff --git a/iocore/eventsystem/P_UnixEThread.h b/iocore/eventsystem/P_UnixEThread.h
index 0b68e5e..e0135a6 100644
--- a/iocore/eventsystem/P_UnixEThread.h
+++ b/iocore/eventsystem/P_UnixEThread.h
@@ -178,4 +178,10 @@ EThread::free_event(Event *e)
   EVENT_FREE(e, eventAllocator, this);
 }
 
+TS_INLINE void
+EThread::set_tail_handler(LoopTailHandler *handler)
+{
+  ink_atomic_swap(&tail_cb, handler);
+}
+
 #endif /*_EThread_h_*/
diff --git a/iocore/eventsystem/ProtectedQueue.cc b/iocore/eventsystem/ProtectedQueue.cc
index c0e1d8e..bb9466b 100644
--- a/iocore/eventsystem/ProtectedQueue.cc
+++ b/iocore/eventsystem/ProtectedQueue.cc
@@ -56,55 +56,7 @@ ProtectedQueue::enqueue(Event *e, bool fast_signal)
     // queue e->ethread in the list of threads to be signalled
     // inserting_thread == 0 means it is not a regular EThread
     if (inserting_thread != e_ethread) {
-      if (!inserting_thread || !inserting_thread->ethreads_to_be_signalled) {
-        signal();
-        if (fast_signal) {
-          if (e_ethread->signal_hook) {
-            e_ethread->signal_hook(e_ethread);
-          }
-        }
-      } else {
-#ifdef EAGER_SIGNALLING
-        // Try to signal now and avoid deferred posting.
-        if (e_ethread->EventQueueExternal.try_signal())
-          return;
-#endif
-        if (fast_signal) {
-          if (e_ethread->signal_hook) {
-            e_ethread->signal_hook(e_ethread);
-          }
-        }
-        int &t          = inserting_thread->n_ethreads_to_be_signalled;
-        EThread **sig_e = inserting_thread->ethreads_to_be_signalled;
-        if ((t + 1) >= eventProcessor.n_ethreads) {
-          // we have run out of room
-          if ((t + 1) == eventProcessor.n_ethreads) {
-            // convert to direct map, put each ethread (sig_e[i]) into
-            // the direct map loation: sig_e[sig_e[i]->id]
-            for (int i = 0; i < t; i++) {
-              EThread *cur = sig_e[i]; // put this ethread
-              while (cur) {
-                EThread *next = sig_e[cur->id]; // into this location
-                if (next == cur) {
-                  break;
-                }
-                sig_e[cur->id] = cur;
-                cur            = next;
-              }
-              // if not overwritten
-              if (sig_e[i] && sig_e[i]->id != i) {
-                sig_e[i] = nullptr;
-              }
-            }
-            t++;
-          }
-          // we have a direct map, insert this EThread
-          sig_e[e_ethread->id] = e_ethread;
-        } else {
-          // insert into vector
-          sig_e[t++] = e_ethread;
-        }
-      }
+      e_ethread->tail_cb->signalActivity();
     }
   }
 }
@@ -119,24 +71,9 @@ flush_signals(EThread *thr)
   }
   int i;
 
-// Since the lock is only there to prevent a race in ink_cond_timedwait
-// the lock is taken only for a short time, thus it is unlikely that
-// this code has any effect.
-#ifdef EAGER_SIGNALLING
   for (i = 0; i < n; i++) {
-    // Try to signal as many threads as possible without blocking.
     if (thr->ethreads_to_be_signalled[i]) {
-      if (thr->ethreads_to_be_signalled[i]->EventQueueExternal.try_signal())
-        thr->ethreads_to_be_signalled[i] = 0;
-    }
-  }
-#endif
-  for (i = 0; i < n; i++) {
-    if (thr->ethreads_to_be_signalled[i]) {
-      thr->ethreads_to_be_signalled[i]->EventQueueExternal.signal();
-      if (thr->ethreads_to_be_signalled[i]->signal_hook) {
-        thr->ethreads_to_be_signalled[i]->signal_hook(thr->ethreads_to_be_signalled[i]);
-      }
+      thr->ethreads_to_be_signalled[i]->tail_cb->signalActivity();
       thr->ethreads_to_be_signalled[i] = nullptr;
     }
   }
@@ -147,17 +84,16 @@ void
 ProtectedQueue::dequeue_timed(ink_hrtime cur_time, ink_hrtime timeout, bool sleep)
 {
   (void)cur_time;
-  Event *e;
   if (sleep) {
-    ink_mutex_acquire(&lock);
-    if (INK_ATOMICLIST_EMPTY(al)) {
-      timespec ts = ink_hrtime_to_timespec(timeout);
-      ink_cond_timedwait(&might_have_data, &lock, &ts);
-    }
-    ink_mutex_release(&lock);
+    this->wait(timeout);
   }
+  this->dequeue_external();
+}
 
-  e = (Event *)ink_atomiclist_popall(&al);
+void
+ProtectedQueue::dequeue_external()
+{
+  Event *e = (Event *)ink_atomiclist_popall(&al);
   // invert the list, to preserve order
   SLL<Event, Event::Link_link> l, t;
   t.head = e;
@@ -174,3 +110,14 @@ ProtectedQueue::dequeue_timed(ink_hrtime cur_time, ink_hrtime timeout, bool slee
     }
   }
 }
+
+void
+ProtectedQueue::wait(ink_hrtime timeout)
+{
+  ink_mutex_acquire(&lock);
+  if (INK_ATOMICLIST_EMPTY(al)) {
+    timespec ts = ink_hrtime_to_timespec(timeout);
+    ink_cond_timedwait(&might_have_data, &lock, &ts);
+  }
+  ink_mutex_release(&lock);
+}
diff --git a/iocore/eventsystem/UnixEThread.cc b/iocore/eventsystem/UnixEThread.cc
index 6544e60..82d074f 100644
--- a/iocore/eventsystem/UnixEThread.cc
+++ b/iocore/eventsystem/UnixEThread.cc
@@ -142,6 +142,97 @@ EThread::process_event(Event *e, int calling_code)
   }
 }
 
+void EThread::process_queue(Que(Event, link) * NegativeQueue)
+{
+  Event *e;
+
+  // Move events from the external thread safe queues to the local queue.
+  EventQueueExternal.dequeue_external();
+
+  // execute all the available external events that have
+  // already been dequeued
+  while ((e = EventQueueExternal.dequeue_local())) {
+    if (e->cancelled) {
+      free_event(e);
+    } else if (!e->timeout_at) { // IMMEDIATE
+      ink_assert(e->period == 0);
+      process_event(e, e->callback_event);
+    } else if (e->timeout_at > 0) { // INTERVAL
+      EventQueue.enqueue(e, cur_time);
+    } else { // NEGATIVE
+      Event *p = nullptr;
+      Event *a = NegativeQueue->head;
+      while (a && a->timeout_at > e->timeout_at) {
+        p = a;
+        a = a->link.next;
+      }
+      if (!a) {
+        NegativeQueue->enqueue(e);
+      } else {
+        NegativeQueue->insert(e, p);
+      }
+    }
+  }
+}
+
+void
+EThread::execute_regular()
+{
+  Event *e;
+  Que(Event, link) NegativeQueue;
+  ink_hrtime next_time = 0;
+
+  // give priority to immediate events
+  for (;;) {
+    if (unlikely(shutdown_event_system == true)) {
+      return;
+    }
+
+    process_queue(&NegativeQueue);
+
+    bool done_one;
+    do {
+      done_one = false;
+      // execute all the eligible internal events
+      EventQueue.check_ready(cur_time, this);
+      while ((e = EventQueue.dequeue_ready(cur_time))) {
+        ink_assert(e);
+        ink_assert(e->timeout_at > 0);
+        if (e->cancelled)
+          free_event(e);
+        else {
+          done_one = true;
+          process_event(e, e->callback_event);
+        }
+      }
+    } while (done_one);
+
+    // execute any negative (poll) events
+    if (NegativeQueue.head) {
+      process_queue(&NegativeQueue);
+
+      // execute poll events
+      while ((e = NegativeQueue.dequeue())) {
+        process_event(e, EVENT_POLL);
+      }
+    }
+
+    next_time             = EventQueue.earliest_timeout();
+    ink_hrtime sleep_time = next_time - Thread::get_hrtime_updated();
+    if (sleep_time > 0) {
+      sleep_time = std::min(sleep_time, HRTIME_MSECONDS(THREAD_MAX_HEARTBEAT_MSECONDS));
+    } else {
+      sleep_time = 0;
+    }
+
+    if (n_ethreads_to_be_signalled) {
+      flush_signals(this);
+    }
+
+    tail_cb->waitForActivity(sleep_time);
+  }
+}
+
 //
 // void  EThread::execute()
 //
@@ -168,128 +259,12 @@ EThread::execute()
 
   switch (tt) {
   case REGULAR: {
-    Event *e;
-    Que(Event, link) NegativeQueue;
-    ink_hrtime next_time = 0;
-
-    // give priority to immediate events
-    for (;;) {
-      if (unlikely(shutdown_event_system == true)) {
-        return;
-      }
-      // execute all the available external events that have
-      // already been dequeued
-      cur_time = Thread::get_hrtime_updated();
-      while ((e = EventQueueExternal.dequeue_local())) {
-        if (e->cancelled) {
-          free_event(e);
-        } else if (!e->timeout_at) { // IMMEDIATE
-          ink_assert(e->period == 0);
-          process_event(e, e->callback_event);
-        } else if (e->timeout_at > 0) { // INTERVAL
-          EventQueue.enqueue(e, cur_time);
-        } else { // NEGATIVE
-          Event *p = nullptr;
-          Event *a = NegativeQueue.head;
-          while (a && a->timeout_at > e->timeout_at) {
-            p = a;
-            a = a->link.next;
-          }
-          if (!a) {
-            NegativeQueue.enqueue(e);
-          } else {
-            NegativeQueue.insert(e, p);
-          }
-        }
-      }
-      bool done_one;
-      do {
-        done_one = false;
-        // execute all the eligible internal events
-        EventQueue.check_ready(cur_time, this);
-        while ((e = EventQueue.dequeue_ready(cur_time))) {
-          ink_assert(e);
-          ink_assert(e->timeout_at > 0);
-          if (e->cancelled) {
-            free_event(e);
-          } else {
-            done_one = true;
-            process_event(e, e->callback_event);
-          }
-        }
-      } while (done_one);
-      // execute any negative (poll) events
-      if (NegativeQueue.head) {
-        if (n_ethreads_to_be_signalled) {
-          flush_signals(this);
-        }
-        // dequeue all the external events and put them in a local
-        // queue. If there are no external events available, don't
-        // do a cond_timedwait.
-        if (!INK_ATOMICLIST_EMPTY(EventQueueExternal.al)) {
-          EventQueueExternal.dequeue_timed(cur_time, next_time, false);
-        }
-        while ((e = EventQueueExternal.dequeue_local())) {
-          if (!e->timeout_at) {
-            process_event(e, e->callback_event);
-          } else {
-            if (e->cancelled) {
-              free_event(e);
-            } else {
-              // If its a negative event, it must be a result of
-              // a negative event, which has been turned into a
-              // timed-event (because of a missed lock), executed
-              // before the poll. So, it must
-              // be executed in this round (because you can't have
-              // more than one poll between two executions of a
-              // negative event)
-              if (e->timeout_at < 0) {
-                Event *p = nullptr;
-                Event *a = NegativeQueue.head;
-                while (a && a->timeout_at > e->timeout_at) {
-                  p = a;
-                  a = a->link.next;
-                }
-                if (!a) {
-                  NegativeQueue.enqueue(e);
-                } else {
-                  NegativeQueue.insert(e, p);
-                }
-              } else {
-                EventQueue.enqueue(e, cur_time);
-              }
-            }
-          }
-        }
-        // execute poll events
-        while ((e = NegativeQueue.dequeue())) {
-          process_event(e, EVENT_POLL);
-        }
-        if (!INK_ATOMICLIST_EMPTY(EventQueueExternal.al)) {
-          EventQueueExternal.dequeue_timed(cur_time, next_time, false);
-        }
-      } else { // Means there are no negative events
-        next_time             = EventQueue.earliest_timeout();
-        ink_hrtime sleep_time = next_time - cur_time;
-
-        if (sleep_time > THREAD_MAX_HEARTBEAT_MSECONDS * HRTIME_MSECOND) {
-          next_time = cur_time + THREAD_MAX_HEARTBEAT_MSECONDS * HRTIME_MSECOND;
-        }
-        // dequeue all the external events and put them in a local
-        // queue. If there are no external events available, do a
-        // cond_timedwait.
-        if (n_ethreads_to_be_signalled) {
-          flush_signals(this);
-        }
-        EventQueueExternal.dequeue_timed(cur_time, next_time, true);
-      }
-    }
+    this->execute_regular();
+    break;
   }
-
   case DEDICATED: {
     break;
   }
-
   default:
     ink_assert(!"bad case value (execute)");
     break;
diff --git a/iocore/net/P_UnixNet.h b/iocore/net/P_UnixNet.h
index c0f6338..aea295a 100644
--- a/iocore/net/P_UnixNet.h
+++ b/iocore/net/P_UnixNet.h
@@ -220,9 +220,12 @@ struct PollCont : public Continuation {
 // A NetHandler handles the Network IO operations.  It maintains
 // lists of operations at multiples of it's periodicity.
 //
-class NetHandler : public Continuation
+class NetHandler : public Continuation, public EThread::LoopTailHandler
 {
 public:
+  // @a thread and @a trigger_event are redundant - you can get the former from the latter.
+  // If we don't get rid of @a trigger_event we should remove @a thread.
+  EThread *thread;
   Event *trigger_event;
   QueM(UnixNetVConnection, NetState, read, ready_link) read_ready_list;
   QueM(UnixNetVConnection, NetState, write, ready_link) write_ready_list;
@@ -247,7 +250,7 @@ public:
 
   int startNetEvent(int event, Event *data);
   int mainNetEvent(int event, Event *data);
-  int mainNetEventExt(int event, Event *data);
+  int waitForActivity(ink_hrtime timeout);
   void process_enabled_list();
   void process_ready_list();
   void manage_keep_alive_queue();
@@ -296,6 +299,9 @@ public:
    */
   void stopCop(UnixNetVConnection *netvc);
 
+  // Signal the epoll_wait to terminate.
+  void signalActivity();
+
   /**
     Release a netvc and free it.
 
@@ -742,7 +748,7 @@ NetHandler::startIO(UnixNetVConnection *netvc)
   ink_assert(netvc->thread == this_ethread());
   int res = 0;
 
-  PollDescriptor *pd = get_PollDescriptor(trigger_event->ethread);
+  PollDescriptor *pd = get_PollDescriptor(this->thread);
   if (netvc->ep.start(pd, netvc, EVENTIO_READ | EVENTIO_WRITE) < 0) {
     res = errno;
     // EEXIST should be ok, though it should have been cleared before we got back here
diff --git a/iocore/net/UnixNet.cc b/iocore/net/UnixNet.cc
index 631b4b2..94ac15b 100644
--- a/iocore/net/UnixNet.cc
+++ b/iocore/net/UnixNet.cc
@@ -202,31 +202,18 @@ net_signal_hook_callback(EThread *thread)
 #endif
 }
 
-static void
-net_signal_hook_function(EThread *thread)
-{
-#if HAVE_EVENTFD
-  uint64_t counter = 1;
-  ATS_UNUSED_RETURN(write(thread->evfd, &counter, sizeof(uint64_t)));
-#elif TS_USE_PORT
-  PollDescriptor *pd = get_PollDescriptor(thread);
-  ATS_UNUSED_RETURN(port_send(pd->port_fd, 0, thread->ep));
-#else
-  char dummy = 1;
-  ATS_UNUSED_RETURN(write(thread->evpipe[1], &dummy, 1));
-#endif
-}
-
 void
 initialize_thread_for_net(EThread *thread)
 {
-  new ((ink_dummy_for_new *)get_NetHandler(thread)) NetHandler();
-  new ((ink_dummy_for_new *)get_PollCont(thread)) PollCont(thread->mutex, get_NetHandler(thread));
-  get_NetHandler(thread)->mutex = new_ProxyMutex();
-  PollCont *pc                  = get_PollCont(thread);
-  PollDescriptor *pd            = pc->pollDescriptor;
+  NetHandler *nh = get_NetHandler(thread);
+
+  new ((ink_dummy_for_new *)nh) NetHandler();
+  new ((ink_dummy_for_new *)get_PollCont(thread)) PollCont(thread->mutex, nh);
+  nh->mutex  = new_ProxyMutex();
+  nh->thread = thread;
 
-  thread->schedule_imm(get_NetHandler(thread));
+  PollCont *pc       = get_PollCont(thread);
+  PollDescriptor *pd = pc->pollDescriptor;
 
   InactivityCop *inactivityCop = new InactivityCop(get_NetHandler(thread)->mutex);
   int cop_freq                 = 1;
@@ -234,9 +221,9 @@ initialize_thread_for_net(EThread *thread)
   REC_ReadConfigInteger(cop_freq, "proxy.config.net.inactivity_check_frequency");
   thread->schedule_every(inactivityCop, HRTIME_SECONDS(cop_freq));
 
-  thread->signal_hook = net_signal_hook_function;
-  thread->ep          = (EventIO *)ats_malloc(sizeof(EventIO));
-  thread->ep->type    = EVENTIO_ASYNC_SIGNAL;
+  thread->set_tail_handler(nh);
+  thread->ep       = (EventIO *)ats_malloc(sizeof(EventIO));
+  thread->ep->type = EVENTIO_ASYNC_SIGNAL;
 #if HAVE_EVENTFD
   thread->ep->start(pd, thread->evfd, nullptr, EVENTIO_READ);
 #else
@@ -311,7 +298,7 @@ update_nethandler_config(const char *name, RecDataT data_type ATS_UNUSED, RecDat
 void
 NetHandler::free_netvc(UnixNetVConnection *netvc)
 {
-  EThread *t = trigger_event->ethread;
+  EThread *t = this->thread;
 
   ink_assert(t == this_ethread());
   ink_release_assert(netvc->thread == t);
@@ -357,9 +344,7 @@ NetHandler::startNetEvent(int event, Event *e)
   configure_per_thread();
 
   (void)event;
-  SET_HANDLER((NetContHandler)&NetHandler::mainNetEvent);
-  e->schedule_every(-HRTIME_MSECONDS(net_event_period));
-  trigger_event = e;
+
   return EVENT_CONT;
 }
 
@@ -408,7 +393,7 @@ NetHandler::process_ready_list()
     if (vc->closed)
       free_netvc(vc);
     else if (vc->read.enabled && vc->read.triggered)
-      vc->net_read_io(this, trigger_event->ethread);
+      vc->net_read_io(this, this->thread);
     else if (!vc->read.enabled) {
       read_ready_list.remove(vc);
 #if defined(solaris)
@@ -425,7 +410,7 @@ NetHandler::process_ready_list()
     if (vc->closed)
       free_netvc(vc);
     else if (vc->write.enabled && vc->write.triggered)
-      write_to_net(this, vc, trigger_event->ethread);
+      write_to_net(this, vc, this->thread);
     else if (!vc->write.enabled) {
       write_ready_list.remove(vc);
 #if defined(solaris)
@@ -443,7 +428,7 @@ NetHandler::process_ready_list()
     if (vc->closed)
       free_netvc(vc);
     else if (vc->read.enabled && vc->read.triggered)
-      vc->net_read_io(this, trigger_event->ethread);
+      vc->net_read_io(this, this->thread);
     else if (!vc->read.enabled)
       vc->ep.modify(-EVENTIO_READ);
   }
@@ -452,7 +437,7 @@ NetHandler::process_ready_list()
     if (vc->closed)
       free_netvc(vc);
     else if (vc->write.enabled && vc->write.triggered)
-      write_to_net(this, vc, trigger_event->ethread);
+      write_to_net(this, vc, this->thread);
     else if (!vc->write.enabled)
       vc->ep.modify(-EVENTIO_WRITE);
   }
@@ -469,18 +454,25 @@ NetHandler::mainNetEvent(int event, Event *e)
   ink_assert(trigger_event == e && (event == EVENT_INTERVAL || event == EVENT_POLL));
   (void)event;
   (void)e;
+  return this->waitForActivity(-1);
+}
+
+int
+NetHandler::waitForActivity(ink_hrtime timeout)
+{
   EventIO *epd = nullptr;
 
   NET_INCREMENT_DYN_STAT(net_handler_run_stat);
+  SCOPED_MUTEX_LOCK(lock, mutex, this->thread);
 
   process_enabled_list();
 
   // Polling event by PollCont
-  PollCont *p = get_PollCont(trigger_event->ethread);
+  PollCont *p = get_PollCont(this->thread);
   p->handleEvent(EVENT_NONE, nullptr);
 
   // Get & Process polling result
-  PollDescriptor *pd     = get_PollDescriptor(trigger_event->ethread);
+  PollDescriptor *pd     = get_PollDescriptor(this->thread);
   UnixNetVConnection *vc = nullptr;
   for (int x = 0; x < pd->result; x++) {
     epd = (EventIO *)get_ev_data(pd, x);
@@ -521,7 +513,7 @@ NetHandler::mainNetEvent(int event, Event *e)
 #endif
       }
     } else if (epd->type == EVENTIO_ASYNC_SIGNAL) {
-      net_signal_hook_callback(trigger_event->ethread);
+      net_signal_hook_callback(this->thread);
     }
     ev_next_event(pd, x);
   }
@@ -533,6 +525,21 @@ NetHandler::mainNetEvent(int event, Event *e)
   return EVENT_CONT;
 }
 
+void
+NetHandler::signalActivity()
+{
+#if HAVE_EVENTFD
+  uint64_t counter = 1;
+  ATS_UNUSED_RETURN(write(thread->evfd, &counter, sizeof(uint64_t)));
+#elif TS_USE_PORT
+  PollDescriptor *pd = get_PollDescriptor(thread);
+  ATS_UNUSED_RETURN(port_send(pd->port_fd, 0, thread->ep));
+#else
+  char dummy = 1;
+  ATS_UNUSED_RETURN(write(thread->evpipe[1], &dummy, 1));
+#endif
+}
+
 bool
 NetHandler::manage_active_queue(bool ignore_queue_size = false)
 {
diff --git a/iocore/net/UnixNetVConnection.cc b/iocore/net/UnixNetVConnection.cc
index 17e0e39..cd5036e 100644
--- a/iocore/net/UnixNetVConnection.cc
+++ b/iocore/net/UnixNetVConnection.cc
@@ -817,8 +817,8 @@ UnixNetVConnection::reenable(VIO *vio)
           nh->write_enable_list.push(this);
         }
       }
-      if (nh->trigger_event && nh->trigger_event->ethread->signal_hook) {
-        nh->trigger_event->ethread->signal_hook(nh->trigger_event->ethread);
+      if (nh->trigger_event) {
+        nh->trigger_event->ethread->tail_cb->signalActivity();
       }
     } else {
       if (vio == &read.vio) {

-- 
To stop receiving notification emails like this one, please contact
['"commits@trafficserver.apache.org" <co...@trafficserver.apache.org>'].