You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by am...@apache.org on 2017/10/21 19:52:42 UTC
[trafficserver] branch master updated: event loop metrics collection
This is an automated email from the ASF dual-hosted git repository.
amc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git
The following commit(s) were added to refs/heads/master by this push:
new 3ab9742 event loop metrics collection
3ab9742 is described below
commit 3ab9742521c6d7112ed6d159de72ef2eb8a02ebf
Author: Fei Deng <fe...@oath.com>
AuthorDate: Thu Oct 19 10:14:32 2017 -0500
event loop metrics collection
---
cmd/traffic_manager/Makefile.am | 2 +-
.../monitoring/statistics/core/misc.en.rst | 34 +++++++
iocore/eventsystem/I_EThread.h | 78 ++++++++++++++-
iocore/eventsystem/UnixEThread.cc | 105 ++++++++++++++++++++-
iocore/eventsystem/UnixEventProcessor.cc | 60 ++++++++++++
5 files changed, 274 insertions(+), 5 deletions(-)
diff --git a/cmd/traffic_manager/Makefile.am b/cmd/traffic_manager/Makefile.am
index cf64df9..3f02a99 100644
--- a/cmd/traffic_manager/Makefile.am
+++ b/cmd/traffic_manager/Makefile.am
@@ -51,9 +51,9 @@ traffic_manager_LDADD = \
$(top_builddir)/mgmt/api/libmgmtapilocal.la \
$(top_builddir)/mgmt/libmgmt_lm.la \
$(top_builddir)/proxy/hdrs/libhdrs.a \
- $(top_builddir)/lib/records/librecords_lm.a \
$(top_builddir)/lib/ts/libtsutil.la \
$(top_builddir)/iocore/eventsystem/libinkevent.a \
+ $(top_builddir)/lib/records/librecords_lm.a \
$(top_builddir)/proxy/shared/libdiagsconfig.a
AM_LDFLAGS += \
diff --git a/doc/admin-guide/monitoring/statistics/core/misc.en.rst b/doc/admin-guide/monitoring/statistics/core/misc.en.rst
index 81cd3db..ba03882 100644
--- a/doc/admin-guide/monitoring/statistics/core/misc.en.rst
+++ b/doc/admin-guide/monitoring/statistics/core/misc.en.rst
@@ -25,3 +25,37 @@ Miscellaneous
.. ts:stat:: global proxy.process.http.misc_count_stat integer
.. ts:stat:: global proxy.process.http.misc_user_agent_bytes_stat integer
+.. ts:stat:: global proxy.process.eventloop count integer
+ :unit: nanoseconds
+
+ Number of event loops executed.
+
+.. ts:stat:: global proxy.process.eventloop events integer
+ :unit: nanoseconds
+
+ Number of events executed.
+
+.. ts:stat:: global proxy.process.eventloop events.min integer
+ :unit: nanoseconds
+
+ Minimum number of events dispatched in a loop.
+
+.. ts:stat:: global proxy.process.eventloop events.max integer
+ :unit: nanoseconds
+
+ Maximum number of events dispatched in a loop.
+
+.. ts:stat:: global proxy.process.eventloop wait integer
+ :unit: nanoseconds
+
+ Number of loops that did a conditional wait.
+
+.. ts:stat:: global proxy.process.eventloop time.min integer
+ :unit: nanoseconds
+
+ Shortest time spent in loop.
+
+.. ts:stat:: global proxy.process.eventloop time.max integer
+ :unit: nanoseconds
+
+ Longest time spent in loop.
diff --git a/iocore/eventsystem/I_EThread.h b/iocore/eventsystem/I_EThread.h
index 1739df2..e15d7b3 100644
--- a/iocore/eventsystem/I_EThread.h
+++ b/iocore/eventsystem/I_EThread.h
@@ -330,7 +330,7 @@ public:
void execute() override;
void execute_regular();
- void process_queue(Que(Event, link) * NegativeQueue);
+ void process_queue(Que(Event, link) * NegativeQueue, int *ev_count, int *nq_count);
void process_event(Event *e, int calling_code);
void free_event(Event *e);
LoopTailHandler *tail_cb = &DEFAULT_TAIL_HANDLER;
@@ -379,6 +379,82 @@ public:
friend class EThread;
} DEFAULT_TAIL_HANDLER = EventQueueExternal;
+
+ /// Statistics data for event dispatching.
+ struct EventMetrics {
+ /// Time the loop was active, not including wait time but including event dispatch time.
+ struct LoopTimes {
+ ink_hrtime _start; ///< The time of the first loop for this sample. Used to mark valid entries.
+ ink_hrtime _min; ///< Shortest loop time.
+ ink_hrtime _max; ///< Longest loop time.
+ LoopTimes() : _start(0), _min(INT64_MAX), _max(0) {}
+ } _loop_time;
+
+ struct Events {
+ int _min;
+ int _max;
+ int _total;
+ Events() : _min(INT_MAX), _max(0), _total(0) {}
+ } _events;
+
+ int _count; ///< # of times the loop executed.
+ int _wait; ///< # of timed wait for events
+
+ /// Add @a that to @a this data.
+ /// This embodies the custom logic per member concerning whether each is a sum, min, or max.
+ EventMetrics &operator+=(EventMetrics const &that);
+
+ EventMetrics() : _count(0), _wait(0) {}
+ };
+
+ /** The number of metric blocks kept.
+ This is a circular buffer, with one block per second. We have a bit more than the required 1000
+ to provide sufficient slop for cross thread reading of the data (as only the current metric block
+ is being updated).
+ */
+ static int const N_EVENT_METRICS = 1024;
+
+ volatile EventMetrics *current_metric; ///< The current element of @a metrics
+ EventMetrics metrics[N_EVENT_METRICS];
+
+ /** The various stats provided to the administrator.
+ THE ORDER IS VERY SENSITIVE.
+ More than one part of the code depends on this exact order. Be careful and thorough when changing.
+ */
+ enum STAT_ID {
+ STAT_LOOP_COUNT, ///< # of event loops executed.
+ STAT_LOOP_EVENTS, ///< # of events
+ STAT_LOOP_EVENTS_MIN, ///< min # of events dispatched in a loop
+ STAT_LOOP_EVENTS_MAX, ///< max # of events dispatched in a loop
+ STAT_LOOP_WAIT, ///< # of loops that did a conditional wait.
+ STAT_LOOP_TIME_MIN, ///< Shortest time spent in loop.
+ STAT_LOOP_TIME_MAX, ///< Longest time spent in loop.
+ N_EVENT_STATS ///< NOT A VALID STAT INDEX - # of different stat types.
+ };
+
+ static char const *const STAT_NAME[N_EVENT_STATS];
+
+ /** The number of time scales used in the event statistics.
+ Currently these are 10s, 100s, 1000s.
+ */
+ static int const N_EVENT_TIMESCALES = 3;
+ /// # of samples for each time scale.
+ static int const SAMPLE_COUNT[N_EVENT_TIMESCALES];
+
+ /// Process the last 1000s of data and write out the summaries to @a summary.
+ void summarize_stats(EventMetrics summary[N_EVENT_TIMESCALES]);
+ /// Back up the metric pointer, wrapping as needed.
+ EventMetrics *
+ prev(EventMetrics volatile *current)
+ {
+ return const_cast<EventMetrics *>(--current < metrics ? &metrics[N_EVENT_METRICS - 1] : current); // cast to remove volatile
+ }
+ /// Advance the metric pointer, wrapping as needed.
+ EventMetrics *
+ next(EventMetrics volatile *current)
+ {
+ return const_cast<EventMetrics *>(++current > &metrics[N_EVENT_METRICS - 1] ? metrics : current); // cast to remove volatile
+ }
};
/**
diff --git a/iocore/eventsystem/UnixEThread.cc b/iocore/eventsystem/UnixEThread.cc
index 82d074f..7fbede5 100644
--- a/iocore/eventsystem/UnixEThread.cc
+++ b/iocore/eventsystem/UnixEThread.cc
@@ -38,6 +38,14 @@ struct AIOCallback;
#define NO_HEARTBEAT -1
#define THREAD_MAX_HEARTBEAT_MSECONDS 60
+// !! THIS MUST BE IN THE ENUM ORDER !!
+char const *const EThread::STAT_NAME[] = {"proxy.process.eventloop.count", "proxy.process.eventloop.events",
+ "proxy.process.eventloop.events.min", "proxy.process.eventloop.events.max",
+ "proxy.process.eventloop.wait", "proxy.process.eventloop.time.min",
+ "proxy.process.eventloop.time.max"};
+
+int const EThread::SAMPLE_COUNT[N_EVENT_TIMESCALES] = {10, 100, 1000};
+
bool shutdown_event_system = false;
EThread::EThread()
@@ -142,7 +150,8 @@ EThread::process_event(Event *e, int calling_code)
}
}
-void EThread::process_queue(Que(Event, link) * NegativeQueue)
+void
+EThread::process_queue(Que(Event, link) * NegativeQueue, int *ev_count, int *nq_count)
{
Event *e;
@@ -152,6 +161,7 @@ void EThread::process_queue(Que(Event, link) * NegativeQueue)
// execute all the available external events that have
// already been dequeued
while ((e = EventQueueExternal.dequeue_local())) {
+ ++(*ev_count);
if (e->cancelled) {
free_event(e);
} else if (!e->timeout_at) { // IMMEDIATE
@@ -172,6 +182,7 @@ void EThread::process_queue(Que(Event, link) * NegativeQueue)
NegativeQueue->insert(e, p);
}
}
+ ++(*nq_count);
}
}
@@ -181,6 +192,18 @@ EThread::execute_regular()
Event *e;
Que(Event, link) NegativeQueue;
ink_hrtime next_time = 0;
+ ink_hrtime delta = 0; // time spent in the event loop
+ ink_hrtime loop_start_time; // Time the loop started.
+ ink_hrtime loop_finish_time; // Time at the end of the loop.
+
+ // Track this so we can update on boundary crossing.
+ EventMetrics *prev_metric = this->prev(metrics + (ink_get_hrtime_internal() / HRTIME_SECOND) % N_EVENT_METRICS);
+
+ int nq_count = 0;
+ int ev_count = 0;
+
+ // A statically initialized instance we can use as a prototype for initializing other instances.
+ static EventMetrics METRIC_INIT;
// give priority to immediate events
for (;;) {
@@ -188,7 +211,22 @@ EThread::execute_regular()
return;
}
- process_queue(&NegativeQueue);
+ loop_start_time = Thread::get_hrtime_updated();
+ nq_count = 0; // count # of elements put on negative queue.
+ ev_count = 0; // # of events handled.
+
+ current_metric = metrics + (loop_start_time / HRTIME_SECOND) % N_EVENT_METRICS;
+ if (current_metric != prev_metric) {
+ // Mixed feelings - really this shouldn't be needed, but just in case more than one entry is
+ // skipped, clear them all.
+ do {
+ memcpy((prev_metric = this->next(prev_metric)), &METRIC_INIT, sizeof(METRIC_INIT));
+ } while (current_metric != prev_metric);
+ current_metric->_loop_time._start = loop_start_time;
+ }
+ ++(current_metric->_count);
+
+ process_queue(&NegativeQueue, &ev_count, &nq_count);
bool done_one;
do {
@@ -209,7 +247,7 @@ EThread::execute_regular()
// execute any negative (poll) events
if (NegativeQueue.head) {
- process_queue(&NegativeQueue);
+ process_queue(&NegativeQueue, &ev_count, &nq_count);
// execute poll events
while ((e = NegativeQueue.dequeue())) {
@@ -221,6 +259,7 @@ EThread::execute_regular()
ink_hrtime sleep_time = next_time - Thread::get_hrtime_updated();
if (sleep_time > 0) {
sleep_time = std::min(sleep_time, HRTIME_MSECONDS(THREAD_MAX_HEARTBEAT_MSECONDS));
+ ++(current_metric->_wait);
} else {
sleep_time = 0;
}
@@ -230,6 +269,29 @@ EThread::execute_regular()
}
tail_cb->waitForActivity(sleep_time);
+
+ // loop cleanup
+ loop_finish_time = this->get_hrtime_updated();
+ delta = loop_finish_time - loop_start_time;
+
+ // This can happen due to time of day adjustments (which apparently happen quite frequently). I
+ // tried using the monotonic clock to get around this but it was *very* stuttery (up to hundreds
+ // of milliseconds), far too much to be actually used.
+ if (delta > 0) {
+ if (delta > current_metric->_loop_time._max) {
+ current_metric->_loop_time._max = delta;
+ }
+ if (delta < current_metric->_loop_time._min) {
+ current_metric->_loop_time._min = delta;
+ }
+ }
+ if (ev_count < current_metric->_events._min) {
+ current_metric->_events._min = ev_count;
+ }
+ if (ev_count > current_metric->_events._max) {
+ current_metric->_events._max = ev_count;
+ }
+ current_metric->_events._total += ev_count;
}
}
@@ -271,3 +333,40 @@ EThread::execute()
} /* End switch */
// coverity[missing_unlock]
}
+
+EThread::EventMetrics &
+EThread::EventMetrics::operator+=(EventMetrics const &that)
+{
+ this->_events._max = std::max(this->_events._max, that._events._max);
+ this->_events._min = std::min(this->_events._min, that._events._min);
+ this->_events._total += that._events._total;
+ this->_loop_time._min = std::min(this->_loop_time._min, that._loop_time._min);
+ this->_loop_time._max = std::max(this->_loop_time._max, that._loop_time._max);
+ this->_count += that._count;
+ this->_wait += that._wait;
+ return *this;
+}
+
+void
+EThread::summarize_stats(EventMetrics summary[N_EVENT_TIMESCALES])
+{
+ // Accumulate in local first so each sample only needs to be processed once,
+ // not N_EVENT_TIMESCALES times.
+ EventMetrics sum;
+
+ // To avoid race conditions, we back up one from the current metric block. It's close enough
+ // and won't be updated during the time this method runs so it should be thread safe.
+ EventMetrics *m = this->prev(current_metric);
+
+ for (int t = 0; t < N_EVENT_TIMESCALES; ++t) {
+ int count = SAMPLE_COUNT[t];
+ if (t > 0)
+ count -= SAMPLE_COUNT[t - 1];
+ while (--count >= 0) {
+ if (0 != m->_loop_time._start)
+ sum += *m;
+ m = this->prev(m);
+ }
+ summary[t] += sum; // push out to return vector.
+ }
+}
diff --git a/iocore/eventsystem/UnixEventProcessor.cc b/iocore/eventsystem/UnixEventProcessor.cc
index 7fc31e7..db7d3f4 100644
--- a/iocore/eventsystem/UnixEventProcessor.cc
+++ b/iocore/eventsystem/UnixEventProcessor.cc
@@ -72,6 +72,52 @@ ThreadAffinityInitializer Thread_Affinity_Initializer;
namespace
{
+int
+EventMetricStatSync(const char *, RecDataT, RecData *, RecRawStatBlock *rsb, int)
+{
+ int id = 0;
+ EThread::EventMetrics summary[EThread::N_EVENT_TIMESCALES];
+
+ // scan the thread local values
+ for (int i = 0; i < eventProcessor.n_ethreads; ++i) {
+ eventProcessor.all_ethreads[i]->summarize_stats(summary);
+ }
+
+ ink_mutex_acquire(&(rsb->mutex));
+
+ for (int ts_idx = 0; ts_idx < EThread::N_EVENT_TIMESCALES; ++ts_idx, id += EThread::N_EVENT_STATS) {
+ EThread::EventMetrics *m = summary + ts_idx;
+ // Discarding the atomic swaps for global writes, doesn't seem to actually do anything useful.
+ rsb->global[id + EThread::STAT_LOOP_COUNT]->sum = m->_count;
+ rsb->global[id + EThread::STAT_LOOP_COUNT]->count = 1;
+ RecRawStatUpdateSum(rsb, id + EThread::STAT_LOOP_COUNT);
+
+ rsb->global[id + EThread::STAT_LOOP_WAIT]->sum = m->_wait;
+ rsb->global[id + EThread::STAT_LOOP_WAIT]->count = 1;
+ RecRawStatUpdateSum(rsb, id + EThread::STAT_LOOP_WAIT);
+
+ rsb->global[id + EThread::STAT_LOOP_TIME_MIN]->sum = m->_loop_time._min;
+ rsb->global[id + EThread::STAT_LOOP_TIME_MIN]->count = 1;
+ RecRawStatUpdateSum(rsb, id + EThread::STAT_LOOP_TIME_MIN);
+ rsb->global[id + EThread::STAT_LOOP_TIME_MAX]->sum = m->_loop_time._max;
+ rsb->global[id + EThread::STAT_LOOP_TIME_MAX]->count = 1;
+ RecRawStatUpdateSum(rsb, id + EThread::STAT_LOOP_TIME_MAX);
+
+ rsb->global[id + EThread::STAT_LOOP_EVENTS]->sum = m->_events._total;
+ rsb->global[id + EThread::STAT_LOOP_EVENTS]->count = 1;
+ RecRawStatUpdateSum(rsb, id + EThread::STAT_LOOP_EVENTS);
+ rsb->global[id + EThread::STAT_LOOP_EVENTS_MIN]->sum = m->_events._min;
+ rsb->global[id + EThread::STAT_LOOP_EVENTS_MIN]->count = 1;
+ RecRawStatUpdateSum(rsb, id + EThread::STAT_LOOP_EVENTS_MIN);
+ rsb->global[id + EThread::STAT_LOOP_EVENTS_MAX]->sum = m->_events._max;
+ rsb->global[id + EThread::STAT_LOOP_EVENTS_MAX]->count = 1;
+ RecRawStatUpdateSum(rsb, id + EThread::STAT_LOOP_EVENTS_MAX);
+ }
+
+ ink_mutex_release(&(rsb->mutex));
+ return REC_ERR_OKAY;
+}
+
/// This is a wrapper used to convert a static function into a continuation. The function pointer is
/// passed in the cookie. For this reason the class is used as a singleton.
/// @internal This is the implementation for @c schedule_spawn... overloads.
@@ -382,6 +428,20 @@ EventProcessor::start(int n_event_threads, size_t stacksize)
// infrastructure being in place (e.g. the proxy allocators).
thread_group[ET_CALL]._spawnQueue.push(make_event_for_scheduling(&Thread_Affinity_Initializer, EVENT_IMMEDIATE, nullptr));
+ // Get our statistics set up
+ RecRawStatBlock *rsb = RecAllocateRawStatBlock(EThread::N_EVENT_STATS * EThread::N_EVENT_TIMESCALES);
+ char name[256];
+
+ for (int ts_idx = 0; ts_idx < EThread::N_EVENT_TIMESCALES; ++ts_idx) {
+ for (int id = 0; id < EThread::N_EVENT_STATS; ++id) {
+ snprintf(name, sizeof(name), "%s.%ds", EThread::STAT_NAME[id], EThread::SAMPLE_COUNT[ts_idx]);
+ RecRegisterRawStat(rsb, RECT_PROCESS, name, RECD_INT, RECP_NON_PERSISTENT, id + (ts_idx * EThread::N_EVENT_STATS), NULL);
+ }
+ }
+
+ // Name must be that of a stat, pick one at random since we do all of them in one pass/callback.
+ RecRegisterRawStatSyncCb(name, EventMetricStatSync, rsb, 0);
+
this->spawn_event_threads(ET_CALL, n_event_threads, stacksize);
Debug("iocore_thread", "Created event thread group id %d with %d threads", ET_CALL, n_event_threads);
--
To stop receiving notification emails like this one, please contact
['"commits@trafficserver.apache.org" <co...@trafficserver.apache.org>'].