You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by zw...@apache.org on 2020/05/20 17:50:14 UTC

[trafficserver] 01/02: cleanup the eventloop

This is an automated email from the ASF dual-hosted git repository.

zwoop pushed a commit to branch 9.0.x
in repository https://gitbox.apache.org/repos/asf/trafficserver.git

commit d15c45484e15ec7c141c6af66116c57dd2ce62e2
Author: Fei Deng <du...@gmail.com>
AuthorDate: Wed Oct 30 15:55:15 2019 -0500

    cleanup the eventloop
    
    (cherry picked from commit c179620382dc37eb33f39357f458175800756f14)
    
     Conflicts:
    	iocore/net/UnixNetAccept.cc
---
 iocore/aio/AIO.cc                         |  4 ++--
 iocore/cache/CacheWrite.cc                |  6 +++---
 iocore/dns/DNS.cc                         |  2 +-
 iocore/eventsystem/I_EThread.h            |  6 +-----
 iocore/eventsystem/I_EventProcessor.h     |  8 ++------
 iocore/eventsystem/I_ProtectedQueue.h     |  4 +---
 iocore/eventsystem/P_UnixEThread.h        | 13 ++-----------
 iocore/eventsystem/P_UnixEventProcessor.h | 18 ++----------------
 iocore/eventsystem/ProtectedQueue.cc      | 21 +--------------------
 iocore/eventsystem/UnixEThread.cc         | 16 +---------------
 iocore/net/UnixNetAccept.cc               |  2 +-
 11 files changed, 17 insertions(+), 83 deletions(-)

diff --git a/iocore/aio/AIO.cc b/iocore/aio/AIO.cc
index 5543919..df8e27b 100644
--- a/iocore/aio/AIO.cc
+++ b/iocore/aio/AIO.cc
@@ -480,9 +480,9 @@ aio_thread_main(void *arg)
         SCOPED_MUTEX_LOCK(lock, op->mutex, thr_info->mutex->thread_holding);
         op->handleEvent(EVENT_NONE, nullptr);
       } else if (op->thread == AIO_CALLBACK_THREAD_ANY) {
-        eventProcessor.schedule_imm_signal(op);
+        eventProcessor.schedule_imm(op);
       } else {
-        op->thread->schedule_imm_signal(op);
+        op->thread->schedule_imm(op);
       }
       ink_mutex_acquire(&my_aio_req->aio_mutex);
     } while (true);
diff --git a/iocore/cache/CacheWrite.cc b/iocore/cache/CacheWrite.cc
index 944d527..cff1818 100644
--- a/iocore/cache/CacheWrite.cc
+++ b/iocore/cache/CacheWrite.cc
@@ -360,7 +360,7 @@ Vol::aggWriteDone(int event, Event *e)
   CacheVC *c = nullptr;
   while ((c = sync.dequeue())) {
     if (UINT_WRAP_LTE(c->write_serial + 2, header->write_serial)) {
-      eventProcessor.schedule_imm_signal(c, ET_CALL, AIO_EVENT_DONE);
+      eventProcessor.schedule_imm(c, ET_CALL, AIO_EVENT_DONE);
     } else {
       sync.push(c); // put it back on the front
       break;
@@ -1028,7 +1028,7 @@ Lagain:
       ink_assert(false);
       while ((c = agg.dequeue())) {
         agg_todo_size -= c->agg_len;
-        eventProcessor.schedule_imm_signal(c, ET_CALL, AIO_EVENT_DONE);
+        eventProcessor.schedule_imm(c, ET_CALL, AIO_EVENT_DONE);
       }
       return EVENT_CONT;
     }
@@ -1092,7 +1092,7 @@ Lwait:
     if (event == EVENT_CALL && c->mutex->thread_holding == mutex->thread_holding) {
       ret = EVENT_RETURN;
     } else {
-      eventProcessor.schedule_imm_signal(c, ET_CALL, AIO_EVENT_DONE);
+      eventProcessor.schedule_imm(c, ET_CALL, AIO_EVENT_DONE);
     }
   }
   return ret;
diff --git a/iocore/dns/DNS.cc b/iocore/dns/DNS.cc
index 6713003..d33c4dc 100644
--- a/iocore/dns/DNS.cc
+++ b/iocore/dns/DNS.cc
@@ -1437,7 +1437,7 @@ DNSEntry::post(DNSHandler *h, HostEnt *ent)
   } else {
     mutex = action.mutex;
     SET_HANDLER(&DNSEntry::postOneEvent);
-    submit_thread->schedule_imm_signal(this);
+    submit_thread->schedule_imm(this);
   }
   return 0;
 }
diff --git a/iocore/eventsystem/I_EThread.h b/iocore/eventsystem/I_EThread.h
index c70fb04..45afc73 100644
--- a/iocore/eventsystem/I_EThread.h
+++ b/iocore/eventsystem/I_EThread.h
@@ -126,7 +126,6 @@ public:
 
   */
   Event *schedule_imm(Continuation *c, int callback_event = EVENT_IMMEDIATE, void *cookie = nullptr);
-  Event *schedule_imm_signal(Continuation *c, int callback_event = EVENT_IMMEDIATE, void *cookie = nullptr);
 
   /**
     Schedules the continuation on this EThread to receive an event
@@ -300,7 +299,7 @@ public:
   EThread &operator=(const EThread &) = delete;
   ~EThread() override;
 
-  Event *schedule(Event *e, bool fast_signal = false);
+  Event *schedule(Event *e);
 
   /** Block of memory to allocate thread specific data e.g. stat system arrays. */
   char thread_private[PER_THREAD_DATA];
@@ -314,9 +313,6 @@ public:
   ProtectedQueue EventQueueExternal;
   PriorityEventQueue EventQueue;
 
-  EThread **ethreads_to_be_signalled = nullptr;
-  int n_ethreads_to_be_signalled     = 0;
-
   static constexpr int NO_ETHREAD_ID = -1;
   int id                             = NO_ETHREAD_ID;
   unsigned int event_types           = 0;
diff --git a/iocore/eventsystem/I_EventProcessor.h b/iocore/eventsystem/I_EventProcessor.h
index 227ea19..782137a 100644
--- a/iocore/eventsystem/I_EventProcessor.h
+++ b/iocore/eventsystem/I_EventProcessor.h
@@ -151,11 +151,7 @@ public:
   */
   Event *schedule_imm(Continuation *c, EventType event_type = ET_CALL, int callback_event = EVENT_IMMEDIATE,
                       void *cookie = nullptr);
-  /*
-    provides the same functionality as schedule_imm and also signals the thread immediately
-  */
-  Event *schedule_imm_signal(Continuation *c, EventType event_type = ET_CALL, int callback_event = EVENT_IMMEDIATE,
-                             void *cookie = nullptr);
+
   /**
     Schedules the continuation on a specific thread group to receive an
     event at the given timeout. Requests the EventProcessor to schedule
@@ -331,7 +327,7 @@ public:
   | Unix & non NT Interface                                |
   \*------------------------------------------------------*/
 
-  Event *schedule(Event *e, EventType etype, bool fast_signal = false);
+  Event *schedule(Event *e, EventType etype);
   EThread *assign_thread(EventType etype);
   EThread *assign_affinity_by_type(Continuation *cont, EventType etype);
 
diff --git a/iocore/eventsystem/I_ProtectedQueue.h b/iocore/eventsystem/I_ProtectedQueue.h
index 7742fac..8bd0eeb 100644
--- a/iocore/eventsystem/I_ProtectedQueue.h
+++ b/iocore/eventsystem/I_ProtectedQueue.h
@@ -37,7 +37,7 @@
 #include "tscore/ink_platform.h"
 #include "I_Event.h"
 struct ProtectedQueue {
-  void enqueue(Event *e, bool fast_signal = false);
+  void enqueue(Event *e);
   void signal();
   int try_signal();             // Use non blocking lock and if acquired, signal
   void enqueue_local(Event *e); // Safe when called from the same thread
@@ -54,5 +54,3 @@ struct ProtectedQueue {
 
   ProtectedQueue();
 };
-
-void flush_signals(EThread *t);
diff --git a/iocore/eventsystem/P_UnixEThread.h b/iocore/eventsystem/P_UnixEThread.h
index b65e3a7..55f093a 100644
--- a/iocore/eventsystem/P_UnixEThread.h
+++ b/iocore/eventsystem/P_UnixEThread.h
@@ -45,15 +45,6 @@ EThread::schedule_imm(Continuation *cont, int callback_event, void *cookie)
 }
 
 TS_INLINE Event *
-EThread::schedule_imm_signal(Continuation *cont, int callback_event, void *cookie)
-{
-  Event *e          = ::eventAllocator.alloc();
-  e->callback_event = callback_event;
-  e->cookie         = cookie;
-  return schedule(e->init(cont, 0, 0), true);
-}
-
-TS_INLINE Event *
 EThread::schedule_at(Continuation *cont, ink_hrtime t, int callback_event, void *cookie)
 {
   Event *e          = ::eventAllocator.alloc();
@@ -85,7 +76,7 @@ EThread::schedule_every(Continuation *cont, ink_hrtime t, int callback_event, vo
 }
 
 TS_INLINE Event *
-EThread::schedule(Event *e, bool fast_signal)
+EThread::schedule(Event *e)
 {
   e->ethread = this;
   ink_assert(tt == REGULAR);
@@ -100,7 +91,7 @@ EThread::schedule(Event *e, bool fast_signal)
   // The continuation that gets scheduled later is not always the
   // client VC, it can be HttpCacheSM etc. so save the flags
   e->continuation->control_flags.set_flags(get_cont_flags().get_flags());
-  EventQueueExternal.enqueue(e, fast_signal);
+  EventQueueExternal.enqueue(e);
   return e;
 }
 
diff --git a/iocore/eventsystem/P_UnixEventProcessor.h b/iocore/eventsystem/P_UnixEventProcessor.h
index 6e89d5b..fe89945 100644
--- a/iocore/eventsystem/P_UnixEventProcessor.h
+++ b/iocore/eventsystem/P_UnixEventProcessor.h
@@ -89,7 +89,7 @@ EventProcessor::assign_affinity_by_type(Continuation *cont, EventType etype)
 }
 
 TS_INLINE Event *
-EventProcessor::schedule(Event *e, EventType etype, bool fast_signal)
+EventProcessor::schedule(Event *e, EventType etype)
 {
   ink_assert(etype < MAX_EVENT_TYPES);
 
@@ -116,25 +116,11 @@ EventProcessor::schedule(Event *e, EventType etype, bool fast_signal)
   if (e->continuation->mutex) {
     e->mutex = e->continuation->mutex;
   }
-  e->ethread->EventQueueExternal.enqueue(e, fast_signal);
+  e->ethread->EventQueueExternal.enqueue(e);
   return e;
 }
 
 TS_INLINE Event *
-EventProcessor::schedule_imm_signal(Continuation *cont, EventType et, int callback_event, void *cookie)
-{
-  Event *e = eventAllocator.alloc();
-
-  ink_assert(et < MAX_EVENT_TYPES);
-#ifdef ENABLE_TIME_TRACE
-  e->start_time = Thread::get_hrtime();
-#endif
-  e->callback_event = callback_event;
-  e->cookie         = cookie;
-  return schedule(e->init(cont, 0, 0), et, true);
-}
-
-TS_INLINE Event *
 EventProcessor::schedule_imm(Continuation *cont, EventType et, int callback_event, void *cookie)
 {
   Event *e = eventAllocator.alloc();
diff --git a/iocore/eventsystem/ProtectedQueue.cc b/iocore/eventsystem/ProtectedQueue.cc
index e8741c7..d8a14da 100644
--- a/iocore/eventsystem/ProtectedQueue.cc
+++ b/iocore/eventsystem/ProtectedQueue.cc
@@ -44,7 +44,7 @@
 extern ClassAllocator<Event> eventAllocator;
 
 void
-ProtectedQueue::enqueue(Event *e, bool fast_signal)
+ProtectedQueue::enqueue(Event *e)
 {
   ink_assert(!e->in_the_prot_queue && !e->in_the_priority_queue);
   EThread *e_ethread   = e->ethread;
@@ -62,25 +62,6 @@ ProtectedQueue::enqueue(Event *e, bool fast_signal)
 }
 
 void
-flush_signals(EThread *thr)
-{
-  ink_assert(this_ethread() == thr);
-  int n = thr->n_ethreads_to_be_signalled;
-  if (n > eventProcessor.n_ethreads) {
-    n = eventProcessor.n_ethreads; // MAX
-  }
-  int i;
-
-  for (i = 0; i < n; i++) {
-    if (thr->ethreads_to_be_signalled[i]) {
-      thr->ethreads_to_be_signalled[i]->tail_cb->signalActivity();
-      thr->ethreads_to_be_signalled[i] = nullptr;
-    }
-  }
-  thr->n_ethreads_to_be_signalled = 0;
-}
-
-void
 ProtectedQueue::dequeue_timed(ink_hrtime cur_time, ink_hrtime timeout, bool sleep)
 {
   (void)cur_time;
diff --git a/iocore/eventsystem/UnixEThread.cc b/iocore/eventsystem/UnixEThread.cc
index df44c71..06c9943 100644
--- a/iocore/eventsystem/UnixEThread.cc
+++ b/iocore/eventsystem/UnixEThread.cc
@@ -58,8 +58,6 @@ EThread::EThread()
 
 EThread::EThread(ThreadType att, int anid) : id(anid), tt(att)
 {
-  ethreads_to_be_signalled = static_cast<EThread **>(ats_malloc(MAX_EVENT_THREADS * sizeof(EThread *)));
-  memset(ethreads_to_be_signalled, 0, MAX_EVENT_THREADS * sizeof(EThread *));
   memset(thread_private, 0, PER_THREAD_DATA);
 #if HAVE_EVENTFD
   evfd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
@@ -94,15 +92,7 @@ EThread::EThread(ThreadType att, Event *e) : tt(att), start_event(e)
 
 // Provide a destructor so that SDK functions which create and destroy
 // threads won't have to deal with EThread memory deallocation.
-EThread::~EThread()
-{
-  if (n_ethreads_to_be_signalled > 0) {
-    flush_signals(this);
-  }
-  ats_free(ethreads_to_be_signalled);
-  // TODO: This can't be deleted ....
-  // delete[]l1_hash;
-}
+EThread::~EThread() {}
 
 bool
 EThread::is_event_type(EventType et)
@@ -273,10 +263,6 @@ EThread::execute_regular()
       sleep_time = 0;
     }
 
-    if (n_ethreads_to_be_signalled) {
-      flush_signals(this);
-    }
-
     tail_cb->waitForActivity(sleep_time);
 
     // loop cleanup
diff --git a/iocore/net/UnixNetAccept.cc b/iocore/net/UnixNetAccept.cc
index 7109a27..89c1c96 100644
--- a/iocore/net/UnixNetAccept.cc
+++ b/iocore/net/UnixNetAccept.cc
@@ -360,7 +360,7 @@ NetAccept::do_blocking_accept(EThread *t)
     NetHandler *h   = get_NetHandler(localt);
     // Assign NetHandler->mutex to NetVC
     vc->mutex = h->mutex;
-    localt->schedule_imm_signal(vc);
+    localt->schedule_imm(vc);
   } while (loop);
 
   return 1;