You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by zw...@apache.org on 2020/05/20 17:50:14 UTC
[trafficserver] 01/02: cleanup the eventloop
This is an automated email from the ASF dual-hosted git repository.
zwoop pushed a commit to branch 9.0.x
in repository https://gitbox.apache.org/repos/asf/trafficserver.git
commit d15c45484e15ec7c141c6af66116c57dd2ce62e2
Author: Fei Deng <du...@gmail.com>
AuthorDate: Wed Oct 30 15:55:15 2019 -0500
cleanup the eventloop
(cherry picked from commit c179620382dc37eb33f39357f458175800756f14)
Conflicts:
iocore/net/UnixNetAccept.cc
---
iocore/aio/AIO.cc | 4 ++--
iocore/cache/CacheWrite.cc | 6 +++---
iocore/dns/DNS.cc | 2 +-
iocore/eventsystem/I_EThread.h | 6 +-----
iocore/eventsystem/I_EventProcessor.h | 8 ++------
iocore/eventsystem/I_ProtectedQueue.h | 4 +---
iocore/eventsystem/P_UnixEThread.h | 13 ++-----------
iocore/eventsystem/P_UnixEventProcessor.h | 18 ++----------------
iocore/eventsystem/ProtectedQueue.cc | 21 +--------------------
iocore/eventsystem/UnixEThread.cc | 16 +---------------
iocore/net/UnixNetAccept.cc | 2 +-
11 files changed, 17 insertions(+), 83 deletions(-)
diff --git a/iocore/aio/AIO.cc b/iocore/aio/AIO.cc
index 5543919..df8e27b 100644
--- a/iocore/aio/AIO.cc
+++ b/iocore/aio/AIO.cc
@@ -480,9 +480,9 @@ aio_thread_main(void *arg)
SCOPED_MUTEX_LOCK(lock, op->mutex, thr_info->mutex->thread_holding);
op->handleEvent(EVENT_NONE, nullptr);
} else if (op->thread == AIO_CALLBACK_THREAD_ANY) {
- eventProcessor.schedule_imm_signal(op);
+ eventProcessor.schedule_imm(op);
} else {
- op->thread->schedule_imm_signal(op);
+ op->thread->schedule_imm(op);
}
ink_mutex_acquire(&my_aio_req->aio_mutex);
} while (true);
diff --git a/iocore/cache/CacheWrite.cc b/iocore/cache/CacheWrite.cc
index 944d527..cff1818 100644
--- a/iocore/cache/CacheWrite.cc
+++ b/iocore/cache/CacheWrite.cc
@@ -360,7 +360,7 @@ Vol::aggWriteDone(int event, Event *e)
CacheVC *c = nullptr;
while ((c = sync.dequeue())) {
if (UINT_WRAP_LTE(c->write_serial + 2, header->write_serial)) {
- eventProcessor.schedule_imm_signal(c, ET_CALL, AIO_EVENT_DONE);
+ eventProcessor.schedule_imm(c, ET_CALL, AIO_EVENT_DONE);
} else {
sync.push(c); // put it back on the front
break;
@@ -1028,7 +1028,7 @@ Lagain:
ink_assert(false);
while ((c = agg.dequeue())) {
agg_todo_size -= c->agg_len;
- eventProcessor.schedule_imm_signal(c, ET_CALL, AIO_EVENT_DONE);
+ eventProcessor.schedule_imm(c, ET_CALL, AIO_EVENT_DONE);
}
return EVENT_CONT;
}
@@ -1092,7 +1092,7 @@ Lwait:
if (event == EVENT_CALL && c->mutex->thread_holding == mutex->thread_holding) {
ret = EVENT_RETURN;
} else {
- eventProcessor.schedule_imm_signal(c, ET_CALL, AIO_EVENT_DONE);
+ eventProcessor.schedule_imm(c, ET_CALL, AIO_EVENT_DONE);
}
}
return ret;
diff --git a/iocore/dns/DNS.cc b/iocore/dns/DNS.cc
index 6713003..d33c4dc 100644
--- a/iocore/dns/DNS.cc
+++ b/iocore/dns/DNS.cc
@@ -1437,7 +1437,7 @@ DNSEntry::post(DNSHandler *h, HostEnt *ent)
} else {
mutex = action.mutex;
SET_HANDLER(&DNSEntry::postOneEvent);
- submit_thread->schedule_imm_signal(this);
+ submit_thread->schedule_imm(this);
}
return 0;
}
diff --git a/iocore/eventsystem/I_EThread.h b/iocore/eventsystem/I_EThread.h
index c70fb04..45afc73 100644
--- a/iocore/eventsystem/I_EThread.h
+++ b/iocore/eventsystem/I_EThread.h
@@ -126,7 +126,6 @@ public:
*/
Event *schedule_imm(Continuation *c, int callback_event = EVENT_IMMEDIATE, void *cookie = nullptr);
- Event *schedule_imm_signal(Continuation *c, int callback_event = EVENT_IMMEDIATE, void *cookie = nullptr);
/**
Schedules the continuation on this EThread to receive an event
@@ -300,7 +299,7 @@ public:
EThread &operator=(const EThread &) = delete;
~EThread() override;
- Event *schedule(Event *e, bool fast_signal = false);
+ Event *schedule(Event *e);
/** Block of memory to allocate thread specific data e.g. stat system arrays. */
char thread_private[PER_THREAD_DATA];
@@ -314,9 +313,6 @@ public:
ProtectedQueue EventQueueExternal;
PriorityEventQueue EventQueue;
- EThread **ethreads_to_be_signalled = nullptr;
- int n_ethreads_to_be_signalled = 0;
-
static constexpr int NO_ETHREAD_ID = -1;
int id = NO_ETHREAD_ID;
unsigned int event_types = 0;
diff --git a/iocore/eventsystem/I_EventProcessor.h b/iocore/eventsystem/I_EventProcessor.h
index 227ea19..782137a 100644
--- a/iocore/eventsystem/I_EventProcessor.h
+++ b/iocore/eventsystem/I_EventProcessor.h
@@ -151,11 +151,7 @@ public:
*/
Event *schedule_imm(Continuation *c, EventType event_type = ET_CALL, int callback_event = EVENT_IMMEDIATE,
void *cookie = nullptr);
- /*
- provides the same functionality as schedule_imm and also signals the thread immediately
- */
- Event *schedule_imm_signal(Continuation *c, EventType event_type = ET_CALL, int callback_event = EVENT_IMMEDIATE,
- void *cookie = nullptr);
+
/**
Schedules the continuation on a specific thread group to receive an
event at the given timeout. Requests the EventProcessor to schedule
@@ -331,7 +327,7 @@ public:
| Unix & non NT Interface |
\*------------------------------------------------------*/
- Event *schedule(Event *e, EventType etype, bool fast_signal = false);
+ Event *schedule(Event *e, EventType etype);
EThread *assign_thread(EventType etype);
EThread *assign_affinity_by_type(Continuation *cont, EventType etype);
diff --git a/iocore/eventsystem/I_ProtectedQueue.h b/iocore/eventsystem/I_ProtectedQueue.h
index 7742fac..8bd0eeb 100644
--- a/iocore/eventsystem/I_ProtectedQueue.h
+++ b/iocore/eventsystem/I_ProtectedQueue.h
@@ -37,7 +37,7 @@
#include "tscore/ink_platform.h"
#include "I_Event.h"
struct ProtectedQueue {
- void enqueue(Event *e, bool fast_signal = false);
+ void enqueue(Event *e);
void signal();
int try_signal(); // Use non blocking lock and if acquired, signal
void enqueue_local(Event *e); // Safe when called from the same thread
@@ -54,5 +54,3 @@ struct ProtectedQueue {
ProtectedQueue();
};
-
-void flush_signals(EThread *t);
diff --git a/iocore/eventsystem/P_UnixEThread.h b/iocore/eventsystem/P_UnixEThread.h
index b65e3a7..55f093a 100644
--- a/iocore/eventsystem/P_UnixEThread.h
+++ b/iocore/eventsystem/P_UnixEThread.h
@@ -45,15 +45,6 @@ EThread::schedule_imm(Continuation *cont, int callback_event, void *cookie)
}
TS_INLINE Event *
-EThread::schedule_imm_signal(Continuation *cont, int callback_event, void *cookie)
-{
- Event *e = ::eventAllocator.alloc();
- e->callback_event = callback_event;
- e->cookie = cookie;
- return schedule(e->init(cont, 0, 0), true);
-}
-
-TS_INLINE Event *
EThread::schedule_at(Continuation *cont, ink_hrtime t, int callback_event, void *cookie)
{
Event *e = ::eventAllocator.alloc();
@@ -85,7 +76,7 @@ EThread::schedule_every(Continuation *cont, ink_hrtime t, int callback_event, vo
}
TS_INLINE Event *
-EThread::schedule(Event *e, bool fast_signal)
+EThread::schedule(Event *e)
{
e->ethread = this;
ink_assert(tt == REGULAR);
@@ -100,7 +91,7 @@ EThread::schedule(Event *e, bool fast_signal)
// The continuation that gets scheduled later is not always the
// client VC, it can be HttpCacheSM etc. so save the flags
e->continuation->control_flags.set_flags(get_cont_flags().get_flags());
- EventQueueExternal.enqueue(e, fast_signal);
+ EventQueueExternal.enqueue(e);
return e;
}
diff --git a/iocore/eventsystem/P_UnixEventProcessor.h b/iocore/eventsystem/P_UnixEventProcessor.h
index 6e89d5b..fe89945 100644
--- a/iocore/eventsystem/P_UnixEventProcessor.h
+++ b/iocore/eventsystem/P_UnixEventProcessor.h
@@ -89,7 +89,7 @@ EventProcessor::assign_affinity_by_type(Continuation *cont, EventType etype)
}
TS_INLINE Event *
-EventProcessor::schedule(Event *e, EventType etype, bool fast_signal)
+EventProcessor::schedule(Event *e, EventType etype)
{
ink_assert(etype < MAX_EVENT_TYPES);
@@ -116,25 +116,11 @@ EventProcessor::schedule(Event *e, EventType etype, bool fast_signal)
if (e->continuation->mutex) {
e->mutex = e->continuation->mutex;
}
- e->ethread->EventQueueExternal.enqueue(e, fast_signal);
+ e->ethread->EventQueueExternal.enqueue(e);
return e;
}
TS_INLINE Event *
-EventProcessor::schedule_imm_signal(Continuation *cont, EventType et, int callback_event, void *cookie)
-{
- Event *e = eventAllocator.alloc();
-
- ink_assert(et < MAX_EVENT_TYPES);
-#ifdef ENABLE_TIME_TRACE
- e->start_time = Thread::get_hrtime();
-#endif
- e->callback_event = callback_event;
- e->cookie = cookie;
- return schedule(e->init(cont, 0, 0), et, true);
-}
-
-TS_INLINE Event *
EventProcessor::schedule_imm(Continuation *cont, EventType et, int callback_event, void *cookie)
{
Event *e = eventAllocator.alloc();
diff --git a/iocore/eventsystem/ProtectedQueue.cc b/iocore/eventsystem/ProtectedQueue.cc
index e8741c7..d8a14da 100644
--- a/iocore/eventsystem/ProtectedQueue.cc
+++ b/iocore/eventsystem/ProtectedQueue.cc
@@ -44,7 +44,7 @@
extern ClassAllocator<Event> eventAllocator;
void
-ProtectedQueue::enqueue(Event *e, bool fast_signal)
+ProtectedQueue::enqueue(Event *e)
{
ink_assert(!e->in_the_prot_queue && !e->in_the_priority_queue);
EThread *e_ethread = e->ethread;
@@ -62,25 +62,6 @@ ProtectedQueue::enqueue(Event *e, bool fast_signal)
}
void
-flush_signals(EThread *thr)
-{
- ink_assert(this_ethread() == thr);
- int n = thr->n_ethreads_to_be_signalled;
- if (n > eventProcessor.n_ethreads) {
- n = eventProcessor.n_ethreads; // MAX
- }
- int i;
-
- for (i = 0; i < n; i++) {
- if (thr->ethreads_to_be_signalled[i]) {
- thr->ethreads_to_be_signalled[i]->tail_cb->signalActivity();
- thr->ethreads_to_be_signalled[i] = nullptr;
- }
- }
- thr->n_ethreads_to_be_signalled = 0;
-}
-
-void
ProtectedQueue::dequeue_timed(ink_hrtime cur_time, ink_hrtime timeout, bool sleep)
{
(void)cur_time;
diff --git a/iocore/eventsystem/UnixEThread.cc b/iocore/eventsystem/UnixEThread.cc
index df44c71..06c9943 100644
--- a/iocore/eventsystem/UnixEThread.cc
+++ b/iocore/eventsystem/UnixEThread.cc
@@ -58,8 +58,6 @@ EThread::EThread()
EThread::EThread(ThreadType att, int anid) : id(anid), tt(att)
{
- ethreads_to_be_signalled = static_cast<EThread **>(ats_malloc(MAX_EVENT_THREADS * sizeof(EThread *)));
- memset(ethreads_to_be_signalled, 0, MAX_EVENT_THREADS * sizeof(EThread *));
memset(thread_private, 0, PER_THREAD_DATA);
#if HAVE_EVENTFD
evfd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
@@ -94,15 +92,7 @@ EThread::EThread(ThreadType att, Event *e) : tt(att), start_event(e)
// Provide a destructor so that SDK functions which create and destroy
// threads won't have to deal with EThread memory deallocation.
-EThread::~EThread()
-{
- if (n_ethreads_to_be_signalled > 0) {
- flush_signals(this);
- }
- ats_free(ethreads_to_be_signalled);
- // TODO: This can't be deleted ....
- // delete[]l1_hash;
-}
+EThread::~EThread() {}
bool
EThread::is_event_type(EventType et)
@@ -273,10 +263,6 @@ EThread::execute_regular()
sleep_time = 0;
}
- if (n_ethreads_to_be_signalled) {
- flush_signals(this);
- }
-
tail_cb->waitForActivity(sleep_time);
// loop cleanup
diff --git a/iocore/net/UnixNetAccept.cc b/iocore/net/UnixNetAccept.cc
index 7109a27..89c1c96 100644
--- a/iocore/net/UnixNetAccept.cc
+++ b/iocore/net/UnixNetAccept.cc
@@ -360,7 +360,7 @@ NetAccept::do_blocking_accept(EThread *t)
NetHandler *h = get_NetHandler(localt);
// Assign NetHandler->mutex to NetVC
vc->mutex = h->mutex;
- localt->schedule_imm_signal(vc);
+ localt->schedule_imm(vc);
} while (loop);
return 1;