You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by zw...@apache.org on 2020/05/20 17:50:13 UTC

[trafficserver] branch 9.0.x updated (62e1dcb -> 96eaa3aa)

This is an automated email from the ASF dual-hosted git repository.

zwoop pushed a change to branch 9.0.x
in repository https://gitbox.apache.org/repos/asf/trafficserver.git.


    from 62e1dcb  Issue 3546: Add "overridable" to the configuration variable description.
     new d15c454  cleanup the eventloop
     new 96eaa3aa put events into local queue when scheduling on the same thread as the scheduler

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 iocore/aio/AIO.cc                                  |  4 +--
 iocore/cache/CacheWrite.cc                         |  6 ++--
 iocore/dns/DNS.cc                                  |  2 +-
 iocore/eventsystem/I_EThread.h                     |  6 +---
 iocore/eventsystem/I_EventProcessor.h              |  8 ++---
 iocore/eventsystem/I_ProtectedQueue.h              |  4 +--
 iocore/eventsystem/P_UnixEThread.h                 | 19 +++++-------
 iocore/eventsystem/P_UnixEventProcessor.h          | 36 +++++++++-------------
 iocore/eventsystem/ProtectedQueue.cc               | 23 ++------------
 iocore/eventsystem/UnixEThread.cc                  | 16 +---------
 iocore/net/UnixNetAccept.cc                        |  2 +-
 .../null_transform/gold/null_transform-tag.gold    |  2 +-
 12 files changed, 37 insertions(+), 91 deletions(-)


[trafficserver] 02/02: put events into local queue when scheduling on the same thread as the scheduler

Posted by zw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zwoop pushed a commit to branch 9.0.x
in repository https://gitbox.apache.org/repos/asf/trafficserver.git

commit 96eaa3aa44129eb06c46e1dec945e63c47bead6a
Author: Fei Deng <du...@gmail.com>
AuthorDate: Thu Oct 31 14:16:10 2019 -0500

    put events into local queue when scheduling on the same thread as the scheduler
    
    update docs
    
    (cherry picked from commit 6a1c5f7397af98dd45d1e6f227219ab463c6363a)
---
 iocore/eventsystem/P_UnixEThread.h                 |  8 +++++++-
 iocore/eventsystem/P_UnixEventProcessor.h          | 22 ++++++++++++++--------
 iocore/eventsystem/ProtectedQueue.cc               |  2 +-
 .../null_transform/gold/null_transform-tag.gold    |  2 +-
 4 files changed, 23 insertions(+), 11 deletions(-)

diff --git a/iocore/eventsystem/P_UnixEThread.h b/iocore/eventsystem/P_UnixEThread.h
index 55f093a..520ffdf 100644
--- a/iocore/eventsystem/P_UnixEThread.h
+++ b/iocore/eventsystem/P_UnixEThread.h
@@ -91,7 +91,13 @@ EThread::schedule(Event *e)
   // The continuation that gets scheduled later is not always the
   // client VC, it can be HttpCacheSM etc. so save the flags
   e->continuation->control_flags.set_flags(get_cont_flags().get_flags());
-  EventQueueExternal.enqueue(e);
+
+  if (e->ethread == this_ethread()) {
+    EventQueueExternal.enqueue_local(e);
+  } else {
+    EventQueueExternal.enqueue(e);
+  }
+
   return e;
 }
 
diff --git a/iocore/eventsystem/P_UnixEventProcessor.h b/iocore/eventsystem/P_UnixEventProcessor.h
index fe89945..a8ba4f4 100644
--- a/iocore/eventsystem/P_UnixEventProcessor.h
+++ b/iocore/eventsystem/P_UnixEventProcessor.h
@@ -97,18 +97,18 @@ EventProcessor::schedule(Event *e, EventType etype)
     return nullptr;
   }
 
-  EThread *ethread = e->continuation->getThreadAffinity();
-  if (ethread != nullptr && ethread->is_event_type(etype)) {
-    e->ethread = ethread;
+  EThread *affinity_thread = e->continuation->getThreadAffinity();
+  EThread *curr_thread     = this_ethread();
+  if (affinity_thread != nullptr && affinity_thread->is_event_type(etype)) {
+    e->ethread = affinity_thread;
   } else {
-    ethread = this_ethread();
     // Is the current thread eligible?
-    if (ethread != nullptr && ethread->is_event_type(etype)) {
-      e->ethread = ethread;
+    if (curr_thread != nullptr && curr_thread->is_event_type(etype)) {
+      e->ethread = curr_thread;
     } else {
       e->ethread = assign_thread(etype);
     }
-    if (e->continuation->getThreadAffinity() == nullptr) {
+    if (affinity_thread == nullptr) {
       e->continuation->setThreadAffinity(e->ethread);
     }
   }
@@ -116,7 +116,13 @@ EventProcessor::schedule(Event *e, EventType etype)
   if (e->continuation->mutex) {
     e->mutex = e->continuation->mutex;
   }
-  e->ethread->EventQueueExternal.enqueue(e);
+
+  if (curr_thread != nullptr && e->ethread == curr_thread) {
+    e->ethread->EventQueueExternal.enqueue_local(e);
+  } else {
+    e->ethread->EventQueueExternal.enqueue(e);
+  }
+
   return e;
 }
 
diff --git a/iocore/eventsystem/ProtectedQueue.cc b/iocore/eventsystem/ProtectedQueue.cc
index d8a14da..d7936c6 100644
--- a/iocore/eventsystem/ProtectedQueue.cc
+++ b/iocore/eventsystem/ProtectedQueue.cc
@@ -101,7 +101,7 @@ ProtectedQueue::wait(ink_hrtime timeout)
    *   - And then the Event Thread goes to sleep and waits for the wakeup signal of `EThread::might_have_data`,
    *   - The `EThread::lock` will be locked again when the Event Thread wakes up.
    */
-  if (INK_ATOMICLIST_EMPTY(al)) {
+  if (INK_ATOMICLIST_EMPTY(al) && localQueue.empty()) {
     timespec ts = ink_hrtime_to_timespec(timeout);
     ink_cond_timedwait(&might_have_data, &lock, &ts);
   }
diff --git a/tests/gold_tests/null_transform/gold/null_transform-tag.gold b/tests/gold_tests/null_transform/gold/null_transform-tag.gold
index 7f60846..733c4d1 100644
--- a/tests/gold_tests/null_transform/gold/null_transform-tag.gold
+++ b/tests/gold_tests/null_transform/gold/null_transform-tag.gold
@@ -1 +1 @@
-``DIAG: (null_transform)``
\ No newline at end of file
+``DIAG: (null_transform)``


[trafficserver] 01/02: cleanup the eventloop

Posted by zw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zwoop pushed a commit to branch 9.0.x
in repository https://gitbox.apache.org/repos/asf/trafficserver.git

commit d15c45484e15ec7c141c6af66116c57dd2ce62e2
Author: Fei Deng <du...@gmail.com>
AuthorDate: Wed Oct 30 15:55:15 2019 -0500

    cleanup the eventloop
    
    (cherry picked from commit c179620382dc37eb33f39357f458175800756f14)
    
     Conflicts:
    	iocore/net/UnixNetAccept.cc
---
 iocore/aio/AIO.cc                         |  4 ++--
 iocore/cache/CacheWrite.cc                |  6 +++---
 iocore/dns/DNS.cc                         |  2 +-
 iocore/eventsystem/I_EThread.h            |  6 +-----
 iocore/eventsystem/I_EventProcessor.h     |  8 ++------
 iocore/eventsystem/I_ProtectedQueue.h     |  4 +---
 iocore/eventsystem/P_UnixEThread.h        | 13 ++-----------
 iocore/eventsystem/P_UnixEventProcessor.h | 18 ++----------------
 iocore/eventsystem/ProtectedQueue.cc      | 21 +--------------------
 iocore/eventsystem/UnixEThread.cc         | 16 +---------------
 iocore/net/UnixNetAccept.cc               |  2 +-
 11 files changed, 17 insertions(+), 83 deletions(-)

diff --git a/iocore/aio/AIO.cc b/iocore/aio/AIO.cc
index 5543919..df8e27b 100644
--- a/iocore/aio/AIO.cc
+++ b/iocore/aio/AIO.cc
@@ -480,9 +480,9 @@ aio_thread_main(void *arg)
         SCOPED_MUTEX_LOCK(lock, op->mutex, thr_info->mutex->thread_holding);
         op->handleEvent(EVENT_NONE, nullptr);
       } else if (op->thread == AIO_CALLBACK_THREAD_ANY) {
-        eventProcessor.schedule_imm_signal(op);
+        eventProcessor.schedule_imm(op);
       } else {
-        op->thread->schedule_imm_signal(op);
+        op->thread->schedule_imm(op);
       }
       ink_mutex_acquire(&my_aio_req->aio_mutex);
     } while (true);
diff --git a/iocore/cache/CacheWrite.cc b/iocore/cache/CacheWrite.cc
index 944d527..cff1818 100644
--- a/iocore/cache/CacheWrite.cc
+++ b/iocore/cache/CacheWrite.cc
@@ -360,7 +360,7 @@ Vol::aggWriteDone(int event, Event *e)
   CacheVC *c = nullptr;
   while ((c = sync.dequeue())) {
     if (UINT_WRAP_LTE(c->write_serial + 2, header->write_serial)) {
-      eventProcessor.schedule_imm_signal(c, ET_CALL, AIO_EVENT_DONE);
+      eventProcessor.schedule_imm(c, ET_CALL, AIO_EVENT_DONE);
     } else {
       sync.push(c); // put it back on the front
       break;
@@ -1028,7 +1028,7 @@ Lagain:
       ink_assert(false);
       while ((c = agg.dequeue())) {
         agg_todo_size -= c->agg_len;
-        eventProcessor.schedule_imm_signal(c, ET_CALL, AIO_EVENT_DONE);
+        eventProcessor.schedule_imm(c, ET_CALL, AIO_EVENT_DONE);
       }
       return EVENT_CONT;
     }
@@ -1092,7 +1092,7 @@ Lwait:
     if (event == EVENT_CALL && c->mutex->thread_holding == mutex->thread_holding) {
       ret = EVENT_RETURN;
     } else {
-      eventProcessor.schedule_imm_signal(c, ET_CALL, AIO_EVENT_DONE);
+      eventProcessor.schedule_imm(c, ET_CALL, AIO_EVENT_DONE);
     }
   }
   return ret;
diff --git a/iocore/dns/DNS.cc b/iocore/dns/DNS.cc
index 6713003..d33c4dc 100644
--- a/iocore/dns/DNS.cc
+++ b/iocore/dns/DNS.cc
@@ -1437,7 +1437,7 @@ DNSEntry::post(DNSHandler *h, HostEnt *ent)
   } else {
     mutex = action.mutex;
     SET_HANDLER(&DNSEntry::postOneEvent);
-    submit_thread->schedule_imm_signal(this);
+    submit_thread->schedule_imm(this);
   }
   return 0;
 }
diff --git a/iocore/eventsystem/I_EThread.h b/iocore/eventsystem/I_EThread.h
index c70fb04..45afc73 100644
--- a/iocore/eventsystem/I_EThread.h
+++ b/iocore/eventsystem/I_EThread.h
@@ -126,7 +126,6 @@ public:
 
   */
   Event *schedule_imm(Continuation *c, int callback_event = EVENT_IMMEDIATE, void *cookie = nullptr);
-  Event *schedule_imm_signal(Continuation *c, int callback_event = EVENT_IMMEDIATE, void *cookie = nullptr);
 
   /**
     Schedules the continuation on this EThread to receive an event
@@ -300,7 +299,7 @@ public:
   EThread &operator=(const EThread &) = delete;
   ~EThread() override;
 
-  Event *schedule(Event *e, bool fast_signal = false);
+  Event *schedule(Event *e);
 
   /** Block of memory to allocate thread specific data e.g. stat system arrays. */
   char thread_private[PER_THREAD_DATA];
@@ -314,9 +313,6 @@ public:
   ProtectedQueue EventQueueExternal;
   PriorityEventQueue EventQueue;
 
-  EThread **ethreads_to_be_signalled = nullptr;
-  int n_ethreads_to_be_signalled     = 0;
-
   static constexpr int NO_ETHREAD_ID = -1;
   int id                             = NO_ETHREAD_ID;
   unsigned int event_types           = 0;
diff --git a/iocore/eventsystem/I_EventProcessor.h b/iocore/eventsystem/I_EventProcessor.h
index 227ea19..782137a 100644
--- a/iocore/eventsystem/I_EventProcessor.h
+++ b/iocore/eventsystem/I_EventProcessor.h
@@ -151,11 +151,7 @@ public:
   */
   Event *schedule_imm(Continuation *c, EventType event_type = ET_CALL, int callback_event = EVENT_IMMEDIATE,
                       void *cookie = nullptr);
-  /*
-    provides the same functionality as schedule_imm and also signals the thread immediately
-  */
-  Event *schedule_imm_signal(Continuation *c, EventType event_type = ET_CALL, int callback_event = EVENT_IMMEDIATE,
-                             void *cookie = nullptr);
+
   /**
     Schedules the continuation on a specific thread group to receive an
     event at the given timeout. Requests the EventProcessor to schedule
@@ -331,7 +327,7 @@ public:
   | Unix & non NT Interface                                |
   \*------------------------------------------------------*/
 
-  Event *schedule(Event *e, EventType etype, bool fast_signal = false);
+  Event *schedule(Event *e, EventType etype);
   EThread *assign_thread(EventType etype);
   EThread *assign_affinity_by_type(Continuation *cont, EventType etype);
 
diff --git a/iocore/eventsystem/I_ProtectedQueue.h b/iocore/eventsystem/I_ProtectedQueue.h
index 7742fac..8bd0eeb 100644
--- a/iocore/eventsystem/I_ProtectedQueue.h
+++ b/iocore/eventsystem/I_ProtectedQueue.h
@@ -37,7 +37,7 @@
 #include "tscore/ink_platform.h"
 #include "I_Event.h"
 struct ProtectedQueue {
-  void enqueue(Event *e, bool fast_signal = false);
+  void enqueue(Event *e);
   void signal();
   int try_signal();             // Use non blocking lock and if acquired, signal
   void enqueue_local(Event *e); // Safe when called from the same thread
@@ -54,5 +54,3 @@ struct ProtectedQueue {
 
   ProtectedQueue();
 };
-
-void flush_signals(EThread *t);
diff --git a/iocore/eventsystem/P_UnixEThread.h b/iocore/eventsystem/P_UnixEThread.h
index b65e3a7..55f093a 100644
--- a/iocore/eventsystem/P_UnixEThread.h
+++ b/iocore/eventsystem/P_UnixEThread.h
@@ -45,15 +45,6 @@ EThread::schedule_imm(Continuation *cont, int callback_event, void *cookie)
 }
 
 TS_INLINE Event *
-EThread::schedule_imm_signal(Continuation *cont, int callback_event, void *cookie)
-{
-  Event *e          = ::eventAllocator.alloc();
-  e->callback_event = callback_event;
-  e->cookie         = cookie;
-  return schedule(e->init(cont, 0, 0), true);
-}
-
-TS_INLINE Event *
 EThread::schedule_at(Continuation *cont, ink_hrtime t, int callback_event, void *cookie)
 {
   Event *e          = ::eventAllocator.alloc();
@@ -85,7 +76,7 @@ EThread::schedule_every(Continuation *cont, ink_hrtime t, int callback_event, vo
 }
 
 TS_INLINE Event *
-EThread::schedule(Event *e, bool fast_signal)
+EThread::schedule(Event *e)
 {
   e->ethread = this;
   ink_assert(tt == REGULAR);
@@ -100,7 +91,7 @@ EThread::schedule(Event *e, bool fast_signal)
   // The continuation that gets scheduled later is not always the
   // client VC, it can be HttpCacheSM etc. so save the flags
   e->continuation->control_flags.set_flags(get_cont_flags().get_flags());
-  EventQueueExternal.enqueue(e, fast_signal);
+  EventQueueExternal.enqueue(e);
   return e;
 }
 
diff --git a/iocore/eventsystem/P_UnixEventProcessor.h b/iocore/eventsystem/P_UnixEventProcessor.h
index 6e89d5b..fe89945 100644
--- a/iocore/eventsystem/P_UnixEventProcessor.h
+++ b/iocore/eventsystem/P_UnixEventProcessor.h
@@ -89,7 +89,7 @@ EventProcessor::assign_affinity_by_type(Continuation *cont, EventType etype)
 }
 
 TS_INLINE Event *
-EventProcessor::schedule(Event *e, EventType etype, bool fast_signal)
+EventProcessor::schedule(Event *e, EventType etype)
 {
   ink_assert(etype < MAX_EVENT_TYPES);
 
@@ -116,25 +116,11 @@ EventProcessor::schedule(Event *e, EventType etype, bool fast_signal)
   if (e->continuation->mutex) {
     e->mutex = e->continuation->mutex;
   }
-  e->ethread->EventQueueExternal.enqueue(e, fast_signal);
+  e->ethread->EventQueueExternal.enqueue(e);
   return e;
 }
 
 TS_INLINE Event *
-EventProcessor::schedule_imm_signal(Continuation *cont, EventType et, int callback_event, void *cookie)
-{
-  Event *e = eventAllocator.alloc();
-
-  ink_assert(et < MAX_EVENT_TYPES);
-#ifdef ENABLE_TIME_TRACE
-  e->start_time = Thread::get_hrtime();
-#endif
-  e->callback_event = callback_event;
-  e->cookie         = cookie;
-  return schedule(e->init(cont, 0, 0), et, true);
-}
-
-TS_INLINE Event *
 EventProcessor::schedule_imm(Continuation *cont, EventType et, int callback_event, void *cookie)
 {
   Event *e = eventAllocator.alloc();
diff --git a/iocore/eventsystem/ProtectedQueue.cc b/iocore/eventsystem/ProtectedQueue.cc
index e8741c7..d8a14da 100644
--- a/iocore/eventsystem/ProtectedQueue.cc
+++ b/iocore/eventsystem/ProtectedQueue.cc
@@ -44,7 +44,7 @@
 extern ClassAllocator<Event> eventAllocator;
 
 void
-ProtectedQueue::enqueue(Event *e, bool fast_signal)
+ProtectedQueue::enqueue(Event *e)
 {
   ink_assert(!e->in_the_prot_queue && !e->in_the_priority_queue);
   EThread *e_ethread   = e->ethread;
@@ -62,25 +62,6 @@ ProtectedQueue::enqueue(Event *e, bool fast_signal)
 }
 
 void
-flush_signals(EThread *thr)
-{
-  ink_assert(this_ethread() == thr);
-  int n = thr->n_ethreads_to_be_signalled;
-  if (n > eventProcessor.n_ethreads) {
-    n = eventProcessor.n_ethreads; // MAX
-  }
-  int i;
-
-  for (i = 0; i < n; i++) {
-    if (thr->ethreads_to_be_signalled[i]) {
-      thr->ethreads_to_be_signalled[i]->tail_cb->signalActivity();
-      thr->ethreads_to_be_signalled[i] = nullptr;
-    }
-  }
-  thr->n_ethreads_to_be_signalled = 0;
-}
-
-void
 ProtectedQueue::dequeue_timed(ink_hrtime cur_time, ink_hrtime timeout, bool sleep)
 {
   (void)cur_time;
diff --git a/iocore/eventsystem/UnixEThread.cc b/iocore/eventsystem/UnixEThread.cc
index df44c71..06c9943 100644
--- a/iocore/eventsystem/UnixEThread.cc
+++ b/iocore/eventsystem/UnixEThread.cc
@@ -58,8 +58,6 @@ EThread::EThread()
 
 EThread::EThread(ThreadType att, int anid) : id(anid), tt(att)
 {
-  ethreads_to_be_signalled = static_cast<EThread **>(ats_malloc(MAX_EVENT_THREADS * sizeof(EThread *)));
-  memset(ethreads_to_be_signalled, 0, MAX_EVENT_THREADS * sizeof(EThread *));
   memset(thread_private, 0, PER_THREAD_DATA);
 #if HAVE_EVENTFD
   evfd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
@@ -94,15 +92,7 @@ EThread::EThread(ThreadType att, Event *e) : tt(att), start_event(e)
 
 // Provide a destructor so that SDK functions which create and destroy
 // threads won't have to deal with EThread memory deallocation.
-EThread::~EThread()
-{
-  if (n_ethreads_to_be_signalled > 0) {
-    flush_signals(this);
-  }
-  ats_free(ethreads_to_be_signalled);
-  // TODO: This can't be deleted ....
-  // delete[]l1_hash;
-}
+EThread::~EThread() {}
 
 bool
 EThread::is_event_type(EventType et)
@@ -273,10 +263,6 @@ EThread::execute_regular()
       sleep_time = 0;
     }
 
-    if (n_ethreads_to_be_signalled) {
-      flush_signals(this);
-    }
-
     tail_cb->waitForActivity(sleep_time);
 
     // loop cleanup
diff --git a/iocore/net/UnixNetAccept.cc b/iocore/net/UnixNetAccept.cc
index 7109a27..89c1c96 100644
--- a/iocore/net/UnixNetAccept.cc
+++ b/iocore/net/UnixNetAccept.cc
@@ -360,7 +360,7 @@ NetAccept::do_blocking_accept(EThread *t)
     NetHandler *h   = get_NetHandler(localt);
     // Assign NetHandler->mutex to NetVC
     vc->mutex = h->mutex;
-    localt->schedule_imm_signal(vc);
+    localt->schedule_imm(vc);
   } while (loop);
 
   return 1;