You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by am...@apache.org on 2017/10/21 19:52:42 UTC

[trafficserver] branch master updated: event loop metrics collection

This is an automated email from the ASF dual-hosted git repository.

amc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ab9742  event loop metrics collection
3ab9742 is described below

commit 3ab9742521c6d7112ed6d159de72ef2eb8a02ebf
Author: Fei Deng <fe...@oath.com>
AuthorDate: Thu Oct 19 10:14:32 2017 -0500

    event loop metrics collection
---
 cmd/traffic_manager/Makefile.am                    |   2 +-
 .../monitoring/statistics/core/misc.en.rst         |  34 +++++++
 iocore/eventsystem/I_EThread.h                     |  78 ++++++++++++++-
 iocore/eventsystem/UnixEThread.cc                  | 105 ++++++++++++++++++++-
 iocore/eventsystem/UnixEventProcessor.cc           |  60 ++++++++++++
 5 files changed, 274 insertions(+), 5 deletions(-)

diff --git a/cmd/traffic_manager/Makefile.am b/cmd/traffic_manager/Makefile.am
index cf64df9..3f02a99 100644
--- a/cmd/traffic_manager/Makefile.am
+++ b/cmd/traffic_manager/Makefile.am
@@ -51,9 +51,9 @@ traffic_manager_LDADD = \
   $(top_builddir)/mgmt/api/libmgmtapilocal.la \
   $(top_builddir)/mgmt/libmgmt_lm.la \
   $(top_builddir)/proxy/hdrs/libhdrs.a \
-  $(top_builddir)/lib/records/librecords_lm.a \
   $(top_builddir)/lib/ts/libtsutil.la \
   $(top_builddir)/iocore/eventsystem/libinkevent.a \
+  $(top_builddir)/lib/records/librecords_lm.a \
   $(top_builddir)/proxy/shared/libdiagsconfig.a
 
 AM_LDFLAGS += \
diff --git a/doc/admin-guide/monitoring/statistics/core/misc.en.rst b/doc/admin-guide/monitoring/statistics/core/misc.en.rst
index 81cd3db..ba03882 100644
--- a/doc/admin-guide/monitoring/statistics/core/misc.en.rst
+++ b/doc/admin-guide/monitoring/statistics/core/misc.en.rst
@@ -25,3 +25,37 @@ Miscellaneous
 .. ts:stat:: global proxy.process.http.misc_count_stat integer
 .. ts:stat:: global proxy.process.http.misc_user_agent_bytes_stat integer
 
+.. ts:stat:: global proxy.process.eventloop count integer
+    :unit: nanoseconds
+
+    Number of event loops executed.
+
+.. ts:stat:: global proxy.process.eventloop events integer
+    :unit: nanoseconds
+
+    Number of events executed.
+
+.. ts:stat:: global proxy.process.eventloop events.min integer
+    :unit: nanoseconds
+
+    Minimum number of events dispatched in a loop.
+
+.. ts:stat:: global proxy.process.eventloop events.max integer
+    :unit: nanoseconds
+
+    Maximum number of events dispatched in a loop.
+
+.. ts:stat:: global proxy.process.eventloop wait integer
+    :unit: nanoseconds
+
+    Number of loops that did a conditional wait.
+
+.. ts:stat:: global proxy.process.eventloop time.min integer
+    :unit: nanoseconds
+
+    Shortest time spent in loop.
+
+.. ts:stat:: global proxy.process.eventloop time.max integer
+    :unit: nanoseconds
+
+    Longest time spent in loop.
diff --git a/iocore/eventsystem/I_EThread.h b/iocore/eventsystem/I_EThread.h
index 1739df2..e15d7b3 100644
--- a/iocore/eventsystem/I_EThread.h
+++ b/iocore/eventsystem/I_EThread.h
@@ -330,7 +330,7 @@ public:
 
   void execute() override;
   void execute_regular();
-  void process_queue(Que(Event, link) * NegativeQueue);
+  void process_queue(Que(Event, link) * NegativeQueue, int *ev_count, int *nq_count);
   void process_event(Event *e, int calling_code);
   void free_event(Event *e);
   LoopTailHandler *tail_cb = &DEFAULT_TAIL_HANDLER;
@@ -379,6 +379,82 @@ public:
 
     friend class EThread;
   } DEFAULT_TAIL_HANDLER = EventQueueExternal;
+
+  /// Statistics data for event dispatching.
+  struct EventMetrics {
+    /// Time the loop was active, not including wait time but including event dispatch time.
+    struct LoopTimes {
+      ink_hrtime _start; ///< The time of the first loop for this sample. Used to mark valid entries.
+      ink_hrtime _min;   ///< Shortest loop time.
+      ink_hrtime _max;   ///< Longest loop time.
+      LoopTimes() : _start(0), _min(INT64_MAX), _max(0) {}
+    } _loop_time;
+
+    struct Events {
+      int _min;
+      int _max;
+      int _total;
+      Events() : _min(INT_MAX), _max(0), _total(0) {}
+    } _events;
+
+    int _count; ///< # of times the loop executed.
+    int _wait;  ///< # of timed wait for events
+
+    /// Add @a that to @a this data.
+    /// This embodies the custom logic per member concerning whether each is a sum, min, or max.
+    EventMetrics &operator+=(EventMetrics const &that);
+
+    EventMetrics() : _count(0), _wait(0) {}
+  };
+
+  /** The number of metric blocks kept.
+      This is a circular buffer, with one block per second. We have a bit more than the required 1000
+      to provide sufficient slop for cross thread reading of the data (as only the current metric block
+      is being updated).
+  */
+  static int const N_EVENT_METRICS = 1024;
+
+  volatile EventMetrics *current_metric; ///< The current element of @a metrics
+  EventMetrics metrics[N_EVENT_METRICS];
+
+  /** The various stats provided to the administrator.
+      THE ORDER IS VERY SENSITIVE.
+      More than one part of the code depends on this exact order. Be careful and thorough when changing.
+  */
+  enum STAT_ID {
+    STAT_LOOP_COUNT,      ///< # of event loops executed.
+    STAT_LOOP_EVENTS,     ///< # of events
+    STAT_LOOP_EVENTS_MIN, ///< min # of events dispatched in a loop
+    STAT_LOOP_EVENTS_MAX, ///< max # of events dispatched in a loop
+    STAT_LOOP_WAIT,       ///< # of loops that did a conditional wait.
+    STAT_LOOP_TIME_MIN,   ///< Shortest time spent in loop.
+    STAT_LOOP_TIME_MAX,   ///< Longest time spent in loop.
+    N_EVENT_STATS         ///< NOT A VALID STAT INDEX - # of different stat types.
+  };
+
+  static char const *const STAT_NAME[N_EVENT_STATS];
+
+  /** The number of time scales used in the event statistics.
+      Currently these are 10s, 100s, 1000s.
+  */
+  static int const N_EVENT_TIMESCALES = 3;
+  /// # of samples for each time scale.
+  static int const SAMPLE_COUNT[N_EVENT_TIMESCALES];
+
+  /// Process the last 1000s of data and write out the summaries to @a summary.
+  void summarize_stats(EventMetrics summary[N_EVENT_TIMESCALES]);
+  /// Back up the metric pointer, wrapping as needed.
+  EventMetrics *
+  prev(EventMetrics volatile *current)
+  {
+    return const_cast<EventMetrics *>(--current < metrics ? &metrics[N_EVENT_METRICS - 1] : current); // cast to remove volatile
+  }
+  /// Advance the metric pointer, wrapping as needed.
+  EventMetrics *
+  next(EventMetrics volatile *current)
+  {
+    return const_cast<EventMetrics *>(++current > &metrics[N_EVENT_METRICS - 1] ? metrics : current); // cast to remove volatile
+  }
 };
 
 /**
diff --git a/iocore/eventsystem/UnixEThread.cc b/iocore/eventsystem/UnixEThread.cc
index 82d074f..7fbede5 100644
--- a/iocore/eventsystem/UnixEThread.cc
+++ b/iocore/eventsystem/UnixEThread.cc
@@ -38,6 +38,14 @@ struct AIOCallback;
 #define NO_HEARTBEAT -1
 #define THREAD_MAX_HEARTBEAT_MSECONDS 60
 
+// !! THIS MUST BE IN THE ENUM ORDER !!
+char const *const EThread::STAT_NAME[] = {"proxy.process.eventloop.count",      "proxy.process.eventloop.events",
+                                          "proxy.process.eventloop.events.min", "proxy.process.eventloop.events.max",
+                                          "proxy.process.eventloop.wait",       "proxy.process.eventloop.time.min",
+                                          "proxy.process.eventloop.time.max"};
+
+int const EThread::SAMPLE_COUNT[N_EVENT_TIMESCALES] = {10, 100, 1000};
+
 bool shutdown_event_system = false;
 
 EThread::EThread()
@@ -142,7 +150,8 @@ EThread::process_event(Event *e, int calling_code)
   }
 }
 
-void EThread::process_queue(Que(Event, link) * NegativeQueue)
+void
+EThread::process_queue(Que(Event, link) * NegativeQueue, int *ev_count, int *nq_count)
 {
   Event *e;
 
@@ -152,6 +161,7 @@ void EThread::process_queue(Que(Event, link) * NegativeQueue)
   // execute all the available external events that have
   // already been dequeued
   while ((e = EventQueueExternal.dequeue_local())) {
+    ++(*ev_count);
     if (e->cancelled) {
       free_event(e);
     } else if (!e->timeout_at) { // IMMEDIATE
@@ -172,6 +182,7 @@ void EThread::process_queue(Que(Event, link) * NegativeQueue)
         NegativeQueue->insert(e, p);
       }
     }
+    ++(*nq_count);
   }
 }
 
@@ -181,6 +192,18 @@ EThread::execute_regular()
   Event *e;
   Que(Event, link) NegativeQueue;
   ink_hrtime next_time = 0;
+  ink_hrtime delta     = 0;    // time spent in the event loop
+  ink_hrtime loop_start_time;  // Time the loop started.
+  ink_hrtime loop_finish_time; // Time at the end of the loop.
+
+  // Track this so we can update on boundary crossing.
+  EventMetrics *prev_metric = this->prev(metrics + (ink_get_hrtime_internal() / HRTIME_SECOND) % N_EVENT_METRICS);
+
+  int nq_count = 0;
+  int ev_count = 0;
+
+  // A statically initialized instance we can use as a prototype for initializing other instances.
+  static EventMetrics METRIC_INIT;
 
   // give priority to immediate events
   for (;;) {
@@ -188,7 +211,22 @@ EThread::execute_regular()
       return;
     }
 
-    process_queue(&NegativeQueue);
+    loop_start_time = Thread::get_hrtime_updated();
+    nq_count        = 0; // count # of elements put on negative queue.
+    ev_count        = 0; // # of events handled.
+
+    current_metric = metrics + (loop_start_time / HRTIME_SECOND) % N_EVENT_METRICS;
+    if (current_metric != prev_metric) {
+      // Mixed feelings - really this shouldn't be needed, but just in case more than one entry is
+      // skipped, clear them all.
+      do {
+        memcpy((prev_metric = this->next(prev_metric)), &METRIC_INIT, sizeof(METRIC_INIT));
+      } while (current_metric != prev_metric);
+      current_metric->_loop_time._start = loop_start_time;
+    }
+    ++(current_metric->_count);
+
+    process_queue(&NegativeQueue, &ev_count, &nq_count);
 
     bool done_one;
     do {
@@ -209,7 +247,7 @@ EThread::execute_regular()
 
     // execute any negative (poll) events
     if (NegativeQueue.head) {
-      process_queue(&NegativeQueue);
+      process_queue(&NegativeQueue, &ev_count, &nq_count);
 
       // execute poll events
       while ((e = NegativeQueue.dequeue())) {
@@ -221,6 +259,7 @@ EThread::execute_regular()
     ink_hrtime sleep_time = next_time - Thread::get_hrtime_updated();
     if (sleep_time > 0) {
       sleep_time = std::min(sleep_time, HRTIME_MSECONDS(THREAD_MAX_HEARTBEAT_MSECONDS));
+      ++(current_metric->_wait);
     } else {
       sleep_time = 0;
     }
@@ -230,6 +269,29 @@ EThread::execute_regular()
     }
 
     tail_cb->waitForActivity(sleep_time);
+
+    // loop cleanup
+    loop_finish_time = this->get_hrtime_updated();
+    delta            = loop_finish_time - loop_start_time;
+
+    // This can happen due to time of day adjustments (which apparently happen quite frequently). I
+    // tried using the monotonic clock to get around this but it was *very* stuttery (up to hundreds
+    // of milliseconds), far too much to be actually used.
+    if (delta > 0) {
+      if (delta > current_metric->_loop_time._max) {
+        current_metric->_loop_time._max = delta;
+      }
+      if (delta < current_metric->_loop_time._min) {
+        current_metric->_loop_time._min = delta;
+      }
+    }
+    if (ev_count < current_metric->_events._min) {
+      current_metric->_events._min = ev_count;
+    }
+    if (ev_count > current_metric->_events._max) {
+      current_metric->_events._max = ev_count;
+    }
+    current_metric->_events._total += ev_count;
   }
 }
 
@@ -271,3 +333,40 @@ EThread::execute()
   } /* End switch */
   // coverity[missing_unlock]
 }
+
+EThread::EventMetrics &
+EThread::EventMetrics::operator+=(EventMetrics const &that)
+{
+  this->_events._max = std::max(this->_events._max, that._events._max);
+  this->_events._min = std::min(this->_events._min, that._events._min);
+  this->_events._total += that._events._total;
+  this->_loop_time._min = std::min(this->_loop_time._min, that._loop_time._min);
+  this->_loop_time._max = std::max(this->_loop_time._max, that._loop_time._max);
+  this->_count += that._count;
+  this->_wait += that._wait;
+  return *this;
+}
+
+void
+EThread::summarize_stats(EventMetrics summary[N_EVENT_TIMESCALES])
+{
+  // Accumulate in local first so each sample only needs to be processed once,
+  // not N_EVENT_TIMESCALES times.
+  EventMetrics sum;
+
+  // To avoid race conditions, we back up one from the current metric block. It's close enough
+  // and won't be updated during the time this method runs so it should be thread safe.
+  EventMetrics *m = this->prev(current_metric);
+
+  for (int t = 0; t < N_EVENT_TIMESCALES; ++t) {
+    int count = SAMPLE_COUNT[t];
+    if (t > 0)
+      count -= SAMPLE_COUNT[t - 1];
+    while (--count >= 0) {
+      if (0 != m->_loop_time._start)
+        sum += *m;
+      m = this->prev(m);
+    }
+    summary[t] += sum; // push out to return vector.
+  }
+}
diff --git a/iocore/eventsystem/UnixEventProcessor.cc b/iocore/eventsystem/UnixEventProcessor.cc
index 7fc31e7..db7d3f4 100644
--- a/iocore/eventsystem/UnixEventProcessor.cc
+++ b/iocore/eventsystem/UnixEventProcessor.cc
@@ -72,6 +72,52 @@ ThreadAffinityInitializer Thread_Affinity_Initializer;
 
 namespace
 {
+int
+EventMetricStatSync(const char *, RecDataT, RecData *, RecRawStatBlock *rsb, int)
+{
+  int id = 0;
+  EThread::EventMetrics summary[EThread::N_EVENT_TIMESCALES];
+
+  // scan the thread local values
+  for (int i = 0; i < eventProcessor.n_ethreads; ++i) {
+    eventProcessor.all_ethreads[i]->summarize_stats(summary);
+  }
+
+  ink_mutex_acquire(&(rsb->mutex));
+
+  for (int ts_idx = 0; ts_idx < EThread::N_EVENT_TIMESCALES; ++ts_idx, id += EThread::N_EVENT_STATS) {
+    EThread::EventMetrics *m = summary + ts_idx;
+    // Discarding the atomic swaps for global writes, doesn't seem to actually do anything useful.
+    rsb->global[id + EThread::STAT_LOOP_COUNT]->sum   = m->_count;
+    rsb->global[id + EThread::STAT_LOOP_COUNT]->count = 1;
+    RecRawStatUpdateSum(rsb, id + EThread::STAT_LOOP_COUNT);
+
+    rsb->global[id + EThread::STAT_LOOP_WAIT]->sum   = m->_wait;
+    rsb->global[id + EThread::STAT_LOOP_WAIT]->count = 1;
+    RecRawStatUpdateSum(rsb, id + EThread::STAT_LOOP_WAIT);
+
+    rsb->global[id + EThread::STAT_LOOP_TIME_MIN]->sum   = m->_loop_time._min;
+    rsb->global[id + EThread::STAT_LOOP_TIME_MIN]->count = 1;
+    RecRawStatUpdateSum(rsb, id + EThread::STAT_LOOP_TIME_MIN);
+    rsb->global[id + EThread::STAT_LOOP_TIME_MAX]->sum   = m->_loop_time._max;
+    rsb->global[id + EThread::STAT_LOOP_TIME_MAX]->count = 1;
+    RecRawStatUpdateSum(rsb, id + EThread::STAT_LOOP_TIME_MAX);
+
+    rsb->global[id + EThread::STAT_LOOP_EVENTS]->sum   = m->_events._total;
+    rsb->global[id + EThread::STAT_LOOP_EVENTS]->count = 1;
+    RecRawStatUpdateSum(rsb, id + EThread::STAT_LOOP_EVENTS);
+    rsb->global[id + EThread::STAT_LOOP_EVENTS_MIN]->sum   = m->_events._min;
+    rsb->global[id + EThread::STAT_LOOP_EVENTS_MIN]->count = 1;
+    RecRawStatUpdateSum(rsb, id + EThread::STAT_LOOP_EVENTS_MIN);
+    rsb->global[id + EThread::STAT_LOOP_EVENTS_MAX]->sum   = m->_events._max;
+    rsb->global[id + EThread::STAT_LOOP_EVENTS_MAX]->count = 1;
+    RecRawStatUpdateSum(rsb, id + EThread::STAT_LOOP_EVENTS_MAX);
+  }
+
+  ink_mutex_release(&(rsb->mutex));
+  return REC_ERR_OKAY;
+}
+
 /// This is a wrapper used to convert a static function into a continuation. The function pointer is
 /// passed in the cookie. For this reason the class is used as a singleton.
 /// @internal This is the implementation for @c schedule_spawn... overloads.
@@ -382,6 +428,20 @@ EventProcessor::start(int n_event_threads, size_t stacksize)
   // infrastructure being in place (e.g. the proxy allocators).
   thread_group[ET_CALL]._spawnQueue.push(make_event_for_scheduling(&Thread_Affinity_Initializer, EVENT_IMMEDIATE, nullptr));
 
+  // Get our statistics set up
+  RecRawStatBlock *rsb = RecAllocateRawStatBlock(EThread::N_EVENT_STATS * EThread::N_EVENT_TIMESCALES);
+  char name[256];
+
+  for (int ts_idx = 0; ts_idx < EThread::N_EVENT_TIMESCALES; ++ts_idx) {
+    for (int id = 0; id < EThread::N_EVENT_STATS; ++id) {
+      snprintf(name, sizeof(name), "%s.%ds", EThread::STAT_NAME[id], EThread::SAMPLE_COUNT[ts_idx]);
+      RecRegisterRawStat(rsb, RECT_PROCESS, name, RECD_INT, RECP_NON_PERSISTENT, id + (ts_idx * EThread::N_EVENT_STATS), NULL);
+    }
+  }
+
+  // Name must be that of a stat, pick one at random since we do all of them in one pass/callback.
+  RecRegisterRawStatSyncCb(name, EventMetricStatSync, rsb, 0);
+
   this->spawn_event_threads(ET_CALL, n_event_threads, stacksize);
 
   Debug("iocore_thread", "Created event thread group id %d with %d threads", ET_CALL, n_event_threads);

-- 
To stop receiving notification emails like this one, please contact
['"commits@trafficserver.apache.org" <co...@trafficserver.apache.org>'].