You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by zw...@apache.org on 2021/03/29 19:26:27 UTC

[trafficserver] branch 9.1.x updated: Implement log throttling (#7279)

This is an automated email from the ASF dual-hosted git repository.

zwoop pushed a commit to branch 9.1.x
in repository https://gitbox.apache.org/repos/asf/trafficserver.git


The following commit(s) were added to refs/heads/9.1.x by this push:
     new c6c85be  Implement log throttling (#7279)
c6c85be is described below

commit c6c85bee7907af1770eb5d28234ce35c4bf3369e
Author: Brian Neradt <br...@verizonmedia.com>
AuthorDate: Wed Mar 24 09:33:18 2021 -0500

    Implement log throttling (#7279)
    
    Some messages get excessively noisy under high traffic conditions if
    something about their mechanism goes wrong. The pipe logging feature,
    for instance, will emit warning and error messages on every single log
    event if the reader goes down or the pipe buffer fills up. This can
    result in thousands of log messages being emitted per second, which
    makes reading the logs difficult and causes disk space issues.
    
    This commit addresses this issue by adding throttled versions of the log
    functions that can be used in the various locations where the logs have
    been deemed to be noisy.
    
    (cherry picked from commit 10f7c51a13de34842767bcb0aa6621612fbee946)
---
 doc/admin-guide/files/records.config.en.rst |  34 +++
 include/tscore/Diags.h                      | 338 +++++++---------------------
 include/tscore/{Diags.h => DiagsTypes.h}    | 117 +---------
 include/tscore/LogMessage.h                 | 145 ++++++++++++
 include/tscore/Ptr.h                        |  19 ++
 include/tscore/Throttler.h                  | 124 ++++++++++
 iocore/dns/DNS.cc                           |   4 +
 mgmt/RecordsConfig.cc                       |   4 +
 proxy/logging/Log.cc                        |   6 +-
 proxy/logging/Log.h                         |   6 +
 proxy/logging/LogBuffer.cc                  |   4 +-
 proxy/logging/LogConfig.cc                  |  43 +++-
 proxy/logging/LogFile.cc                    |   2 +-
 proxy/logging/LogObject.cc                  |   6 +-
 proxy/logging/LogStandalone.cc              |   2 -
 proxy/shared/DiagsConfig.cc                 |  56 ++---
 proxy/shared/DiagsConfig.h                  |   9 +-
 src/traffic_manager/traffic_manager.cc      |  13 +-
 src/traffic_server/traffic_server.cc        |   2 -
 src/tscore/LogMessage.cc                    | 275 ++++++++++++++++++++++
 src/tscore/Makefile.am                      |   5 +-
 src/tscore/Throttler.cc                     |  57 +++++
 src/tscore/unit_tests/test_Throttler.cc     |  55 +++++
 23 files changed, 912 insertions(+), 414 deletions(-)

diff --git a/doc/admin-guide/files/records.config.en.rst b/doc/admin-guide/files/records.config.en.rst
index 6f7529a..719efec 100644
--- a/doc/admin-guide/files/records.config.en.rst
+++ b/doc/admin-guide/files/records.config.en.rst
@@ -3007,6 +3007,29 @@ Logging Configuration
 
    How often |TS| executes log related periodic tasks, in seconds
 
+.. ts:cv:: CONFIG proxy.config.log.proxy.config.log.throttling_interval_msec INT 60000
+   :reloadable:
+   :units: milliseconds
+
+   The minimum amount of milliseconds between repeated throttled |TS| log
+   events. A value of 0 implies no throttling. Note that for performance
+   reasons only certain logs are compiled with throttling applied to them.
+
+   Throttling is applied to all log events for a particular message which is
+   emitted within its throttling interval. That is, once a throttled log is
+   emitted, none will be emitted until the next log event for that message
+   which occurs outside of this configured interval. As mentioned above, this
+   message is applied not broadly but rather to potentially noisy log messages,
+   such as ones that might occur thousands of times a second under certain
+   error conditions. Once the next log event occurs outside of its interval, a
+   summary message is printed conveying how many messages of that type were
+   throttled since the last time it was emitted.
+
+   It is possible that a log is emitted, followed by more of its type in an
+   interval, then none are emitted after that. Be aware this would result in no
+   summary log message for that interval until the message is emitted again
+   outside of the throttled interval.
+
 .. ts:cv:: CONFIG proxy.config.http.slow.log.threshold INT 0
    :reloadable:
    :units: milliseconds
@@ -3150,6 +3173,17 @@ Diagnostic Logging Configuration
    |TS| plugins will typically log debug messages using the :c:func:`TSDebug`
    API, passing the plugin name as the debug tag.
 
+.. ts:cv:: CONFIG proxy.config.diags.debug.throttling_interval_msec INT 0
+   :reloadable:
+   :units: milliseconds
+
+   The minimum amount of milliseconds between repeated |TS| `diag` and `debug`
+   log events. A value of 0 implies no throttling. All diags and debug logs
+   are compiled with throttling applied to them.
+
+   For details about how log throttling works, see
+   :ts:cv:`log.throttling_interval_msec
+   <proxy.config.log.proxy.config.log.throttling_interval_msec>`.
 
 .. ts:cv:: CONFIG proxy.config.diags.logfile_perm STRING rw-r--r--
 
diff --git a/include/tscore/Diags.h b/include/tscore/Diags.h
index fbcb290..be3e6bf 100644
--- a/include/tscore/Diags.h
+++ b/include/tscore/Diags.h
@@ -33,236 +33,9 @@
 
 #pragma once
 
-#include <cstdarg>
-#include "ink_mutex.h"
-#include "Regex.h"
-#include "ink_apidefs.h"
-#include "ContFlags.h"
-#include "ink_inet.h"
-#include "BaseLogFile.h"
+#include "DiagsTypes.h"
 #include "SourceLocation.h"
-
-#define DIAGS_MAGIC 0x12345678
-#define BYTES_IN_MB 1000000
-
-class Diags;
-
-// extern int diags_on_for_plugins;
-enum DiagsTagType {
-  DiagsTagType_Debug  = 0, // do not renumber --- used as array index
-  DiagsTagType_Action = 1
-};
-
-struct DiagsModeOutput {
-  bool to_stdout;
-  bool to_stderr;
-  bool to_syslog;
-  bool to_diagslog;
-};
-
-enum DiagsLevel { // do not renumber --- used as array index
-  DL_Diag = 0,    // process does not die
-  DL_Debug,       // process does not die
-  DL_Status,      // process does not die
-  DL_Note,        // process does not die
-  DL_Warning,     // process does not die
-  DL_Error,       // process does not die
-  DL_Fatal,       // causes process termination
-  DL_Alert,       // causes process termination
-  DL_Emergency,   // causes process termination, exits with UNRECOVERABLE_EXIT
-  DL_Undefined    // must be last, used for size!
-};
-
-enum StdStream { STDOUT = 0, STDERR };
-
-enum RollingEnabledValues { NO_ROLLING = 0, ROLL_ON_TIME, ROLL_ON_SIZE, ROLL_ON_TIME_OR_SIZE, INVALID_ROLLING_VALUE };
-
-enum DiagsShowLocation { SHOW_LOCATION_NONE = 0, SHOW_LOCATION_DEBUG, SHOW_LOCATION_ALL };
-
-#define DiagsLevel_Count DL_Undefined
-
-#define DiagsLevel_IsTerminal(_l) (((_l) >= DL_Fatal) && ((_l) < DL_Undefined))
-
-// Cleanup Function Prototype - Called before ink_fatal to
-//   cleanup process state
-typedef void (*DiagsCleanupFunc)();
-
-struct DiagsConfigState {
-  // this is static to eliminate many loads from the critical path
-  static int enabled[2];                     // one debug, one action
-  DiagsModeOutput outputs[DiagsLevel_Count]; // where each level prints
-};
-
-//////////////////////////////////////////////////////////////////////////////
-//
-//      class Diags
-//
-//      The Diags class is used for global configuration of the run-time
-//      diagnostics system.  This class provides the following services:
-//
-//      * run-time notices, debugging, warnings, errors
-//      * debugging tags to selectively enable & disable diagnostics
-//      * action tags to selectively enable & disable code paths
-//      * configurable output to stdout, stderr, syslog, error logs
-//      * traffic_manager interface supporting on-the-fly reconfiguration
-//
-//////////////////////////////////////////////////////////////////////////////
-
-class Diags
-{
-public:
-  Diags(std::string_view prefix_string, const char *base_debug_tags, const char *base_action_tags, BaseLogFile *_diags_log,
-        int diags_log_perm = -1, int output_log_perm = -1);
-  virtual ~Diags();
-
-  BaseLogFile *diags_log;
-  BaseLogFile *stdout_log;
-  BaseLogFile *stderr_log;
-
-  const unsigned int magic;
-  DiagsConfigState config;
-  DiagsShowLocation show_location;
-  DiagsCleanupFunc cleanup_func;
-
-  ///////////////////////////
-  // conditional debugging //
-  ///////////////////////////
-
-  bool
-  get_override() const
-  {
-    return get_cont_flag(ContFlags::DEBUG_OVERRIDE);
-  }
-
-  bool
-  test_override_ip(IpEndpoint const &test_ip)
-  {
-    return this->debug_client_ip == test_ip;
-  }
-
-  bool
-  on(DiagsTagType mode = DiagsTagType_Debug) const
-  {
-    return ((config.enabled[mode] == 1) || (config.enabled[mode] == 2 && this->get_override()));
-  }
-
-  bool
-  on(const char *tag, DiagsTagType mode = DiagsTagType_Debug) const
-  {
-    return this->on(mode) && tag_activated(tag, mode);
-  }
-
-  /////////////////////////////////////
-  // low-level tag inquiry functions //
-  /////////////////////////////////////
-
-  bool tag_activated(const char *tag, DiagsTagType mode = DiagsTagType_Debug) const;
-
-  /////////////////////////////
-  // raw printing interfaces //
-  /////////////////////////////
-
-  const char *level_name(DiagsLevel level) const;
-
-  ///////////////////////////////////////////////////////////////////////
-  // user diagnostic output interfaces --- enabled on or off based     //
-  // on the value of the enable flag, and the state of the debug tags. //
-  ///////////////////////////////////////////////////////////////////////
-
-  void
-  print(const char *tag, DiagsLevel level, const SourceLocation *loc, const char *fmt, ...) const TS_PRINTFLIKE(5, 6)
-  {
-    va_list ap;
-    va_start(ap, fmt);
-    print_va(tag, level, loc, fmt, ap);
-    va_end(ap);
-  }
-
-  void print_va(const char *tag, DiagsLevel level, const SourceLocation *loc, const char *fmt, va_list ap) const;
-
-  void
-  log(const char *tag, DiagsLevel level, const SourceLocation *loc, const char *fmt, ...) const TS_PRINTFLIKE(5, 6)
-  {
-    if (on(tag)) {
-      va_list ap;
-      va_start(ap, fmt);
-      print_va(tag, level, loc, fmt, ap);
-      va_end(ap);
-    }
-  }
-
-  void
-  log_va(const char *tag, DiagsLevel level, const SourceLocation *loc, const char *fmt, va_list ap)
-  {
-    if (on(tag)) {
-      print_va(tag, level, loc, fmt, ap);
-    }
-  }
-
-  void
-  error(DiagsLevel level, const SourceLocation *loc, const char *fmt, ...) const TS_PRINTFLIKE(4, 5)
-  {
-    va_list ap;
-    va_start(ap, fmt);
-    error_va(level, loc, fmt, ap);
-    va_end(ap);
-  }
-
-  virtual void error_va(DiagsLevel level, const SourceLocation *loc, const char *fmt, va_list ap) const;
-
-  void dump(FILE *fp = stdout) const;
-
-  void activate_taglist(const char *taglist, DiagsTagType mode = DiagsTagType_Debug);
-
-  void deactivate_all(DiagsTagType mode = DiagsTagType_Debug);
-
-  bool setup_diagslog(BaseLogFile *blf);
-  void config_roll_diagslog(RollingEnabledValues re, int ri, int rs);
-  void config_roll_outputlog(RollingEnabledValues re, int ri, int rs);
-  bool reseat_diagslog();
-  bool should_roll_diagslog();
-  bool should_roll_outputlog();
-
-  bool set_std_output(StdStream stream, const char *file);
-
-  const char *base_debug_tags;  // internal copy of default debug tags
-  const char *base_action_tags; // internal copy of default action tags
-
-  IpAddr debug_client_ip;
-
-private:
-  const std::string prefix_str;
-  mutable ink_mutex tag_table_lock; // prevents reconfig/read races
-  DFA *activated_tags[2];           // 1 table for debug, 1 for action
-
-  // These are the default logfile permissions
-  int diags_logfile_perm  = -1;
-  int output_logfile_perm = -1;
-
-  // log rotation variables
-  RollingEnabledValues outputlog_rolling_enabled;
-  int outputlog_rolling_size;
-  int outputlog_rolling_interval;
-  RollingEnabledValues diagslog_rolling_enabled;
-  int diagslog_rolling_interval;
-  int diagslog_rolling_size;
-  time_t outputlog_time_last_roll;
-  time_t diagslog_time_last_roll;
-
-  bool rebind_std_stream(StdStream stream, int new_fd);
-
-  void
-  lock() const
-  {
-    ink_mutex_acquire(&tag_table_lock);
-  }
-
-  void
-  unlock() const
-  {
-    ink_mutex_release(&tag_table_lock);
-  }
-};
+#include "LogMessage.h"
 
 //////////////////////////////////////////////////////////////////////////
 //                                                                      //
@@ -285,10 +58,14 @@ private:
 
 extern inkcoreapi Diags *diags;
 
-#define DiagsError(level, fmt, ...)                \
-  do {                                             \
-    SourceLocation loc = MakeSourceLocation();     \
-    diags->error(level, &loc, fmt, ##__VA_ARGS__); \
+// Note that the log functions being implemented as a macro has the advantage
+// that the pre-compiler expands this in place such that the call to
+// MakeSourceLocation happens at the call site for the function.
+#define DiagsError(level, ...)                              \
+  do {                                                      \
+    static const SourceLocation loc = MakeSourceLocation(); \
+    static LogMessage log_message;                          \
+    log_message.message(level, loc, __VA_ARGS__);           \
   } while (0)
 
 #define Status(...) DiagsError(DL_Status, __VA_ARGS__)       // Log information
@@ -299,10 +76,40 @@ extern inkcoreapi Diags *diags;
 #define Alert(...) DiagsError(DL_Alert, __VA_ARGS__)         // Log recoverable crash, fail CI, exit & restart, Ops attention
 #define Emergency(...) DiagsError(DL_Emergency, __VA_ARGS__) // Log unrecoverable crash, fail CI, exit, Ops attention
 
-#define DiagsErrorV(level, fmt, ap)                  \
-  do {                                               \
-    const SourceLocation loc = MakeSourceLocation(); \
-    diags->error_va(level, &loc, fmt, ap);           \
+/** Apply throttling to a log site.
+ *
+ * Logs using SiteThrottled* version will be throttled at a certain interval
+ * that applies to the call site, regardless of whether the messages within
+ * that interval are unique or not. This is helpful for logs which can be noisy
+ * and frequently have differing content, such as the length of a buffer or a
+ * counter. Rather than changing the log to contain less information, this can
+ * be applied to the site so that when it is emitted, the information is
+ * present, but the set of possibly slightly different logs will still be
+ * suppressed against a configurable interval as a whole.
+ */
+#define SiteThrottledDiagsError(level, ...)                 \
+  do {                                                      \
+    static const SourceLocation loc = MakeSourceLocation(); \
+    static LogMessage log_message{IS_THROTTLED};            \
+    log_message.message(level, loc, __VA_ARGS__);           \
+  } while (0)
+
+#define SiteThrottledStatus(...) SiteThrottledDiagsError(DL_Status, __VA_ARGS__)   // Log information
+#define SiteThrottledNote(...) SiteThrottledDiagsError(DL_Note, __VA_ARGS__)       // Log significant information
+#define SiteThrottledWarning(...) SiteThrottledDiagsError(DL_Warning, __VA_ARGS__) // Log concerning information
+#define SiteThrottledError(...) SiteThrottledDiagsError(DL_Error, __VA_ARGS__)     // Log operational failure, fail CI
+#define SiteThrottledFatal(...) \
+  SiteThrottledDiagsError(DL_Fatal, __VA_ARGS__) // Log recoverable crash, fail CI, exit & allow restart
+#define SiteThrottledAlert(...) \
+  SiteThrottledDiagsError(DL_Alert, __VA_ARGS__) // Log recoverable crash, fail CI, exit & restart, Ops attention
+#define SiteThrottledEmergency(...) \
+  SiteThrottledDiagsError(DL_Emergency, __VA_ARGS__) // Log unrecoverable crash, fail CI, exit, Ops attention
+
+#define DiagsErrorV(level, fmt, ap)                         \
+  do {                                                      \
+    static const SourceLocation loc = MakeSourceLocation(); \
+    static LogMessage log_message;                          \
+    log_message.message_va(level, loc, fmt, ap);            \
   } while (0)
 
 #define StatusV(fmt, ap) DiagsErrorV(DL_Status, fmt, ap)
@@ -313,29 +120,58 @@ extern inkcoreapi Diags *diags;
 #define AlertV(fmt, ap) DiagsErrorV(DL_Alert, fmt, ap)
 #define EmergencyV(fmt, ap) DiagsErrorV(DL_Emergency, fmt, ap)
 
+/** See the comment above SiteThrottledDiagsError for an explanation of how the
+ * SiteThrottled functions behave. */
+#define SiteThrottledDiagsErrorV(level, fmt, ap)            \
+  do {                                                      \
+    static const SourceLocation loc = MakeSourceLocation(); \
+    static LogMessage log_message{IS_THROTTLED};            \
+    log_message.message_va(level, loc, fmt, ap);            \
+  } while (0)
+
+#define SiteThrottledStatusV(fmt, ap) SiteThrottledDiagsErrorV(DL_Status, fmt, ap)
+#define SiteThrottledNoteV(fmt, ap) SiteThrottledDiagsErrorV(DL_Note, fmt, ap)
+#define SiteThrottledWarningV(fmt, ap) SiteThrottledDiagsErrorV(DL_Warning, fmt, ap)
+#define SiteThrottledErrorV(fmt, ap) SiteThrottledDiagsErrorV(DL_Error, fmt, ap)
+#define SiteThrottledFatalV(fmt, ap) SiteThrottledDiagsErrorV(DL_Fatal, fmt, ap)
+#define SiteThrottledAlertV(fmt, ap) SiteThrottledDiagsErrorV(DL_Alert, fmt, ap)
+#define SiteThrottledEmergencyV(fmt, ap) SiteThrottledDiagsErrorV(DL_Emergency, fmt, ap)
+
 #if TS_USE_DIAGS
 
-#define Diag(tag, ...)                                 \
-  do {                                                 \
-    if (unlikely(diags->on())) {                       \
-      const SourceLocation loc = MakeSourceLocation(); \
-      diags->log(tag, DL_Diag, &loc, __VA_ARGS__);     \
-    }                                                  \
+/// A Diag version of the above.
+#define Diag(tag, ...)                                        \
+  do {                                                        \
+    if (unlikely(diags->on())) {                              \
+      static const SourceLocation loc = MakeSourceLocation(); \
+      static LogMessage log_message;                          \
+      log_message.diag(tag, loc, __VA_ARGS__);                \
+    }                                                         \
   } while (0)
 
-#define Debug(tag, ...)                                \
-  do {                                                 \
-    if (unlikely(diags->on())) {                       \
-      const SourceLocation loc = MakeSourceLocation(); \
-      diags->log(tag, DL_Debug, &loc, __VA_ARGS__);    \
-    }                                                  \
+/// A Debug version of the above.
+#define Debug(tag, ...)                                       \
+  do {                                                        \
+    if (unlikely(diags->on())) {                              \
+      static const SourceLocation loc = MakeSourceLocation(); \
+      static LogMessage log_message;                          \
+      log_message.debug(tag, loc, __VA_ARGS__);               \
+    }                                                         \
   } while (0)
 
+/** Same as Debug above, but this allows a positive override of the tag
+ * mechanism by a flag boolean.
+ *
+ * @param[in] flag True if the message should be logged regardless of tag
+ * configuration, false if the logging of the message should respsect the tag
+ * configuration.
+ */
 #define SpecificDebug(flag, tag, ...)                                                                       \
   do {                                                                                                      \
     if (unlikely(diags->on())) {                                                                            \
-      const SourceLocation loc = MakeSourceLocation();                                                      \
-      flag ? diags->print(tag, DL_Debug, &loc, __VA_ARGS__) : diags->log(tag, DL_Debug, &loc, __VA_ARGS__); \
+      static const SourceLocation loc = MakeSourceLocation();                                               \
+      static LogMessage log_message;                                                                        \
+      flag ? log_message.print(tag, DL_Debug, loc, __VA_ARGS__) : log_message.debug(tag, loc, __VA_ARGS__); \
     }                                                                                                       \
   } while (0)
 
diff --git a/include/tscore/Diags.h b/include/tscore/DiagsTypes.h
similarity index 60%
copy from include/tscore/Diags.h
copy to include/tscore/DiagsTypes.h
index fbcb290..8e98242 100644
--- a/include/tscore/Diags.h
+++ b/include/tscore/DiagsTypes.h
@@ -1,6 +1,6 @@
 /** @file
 
-  A brief file description
+  Diags type declarations.
 
   @section license License
 
@@ -23,30 +23,28 @@
 
 /****************************************************************************
 
-  Diags.h
+  DiagsTypes.h
 
-  This file contains code to manipulate run-time diagnostics, and print
-  warnings and errors at runtime.  Action tags and debugging tags are
-  supported, allowing run-time conditionals affecting diagnostics.
+  This file contains the type declarations for Diags logging.
 
  ****************************************************************************/
 
 #pragma once
 
 #include <cstdarg>
-#include "ink_mutex.h"
-#include "Regex.h"
-#include "ink_apidefs.h"
+#include <string>
+#include <string_view>
+#include "BaseLogFile.h"
 #include "ContFlags.h"
+#include "ink_apidefs.h"
 #include "ink_inet.h"
-#include "BaseLogFile.h"
+#include "ink_mutex.h"
+#include "Regex.h"
 #include "SourceLocation.h"
 
 #define DIAGS_MAGIC 0x12345678
 #define BYTES_IN_MB 1000000
 
-class Diags;
-
 // extern int diags_on_for_plugins;
 enum DiagsTagType {
   DiagsTagType_Debug  = 0, // do not renumber --- used as array index
@@ -169,6 +167,7 @@ public:
   // on the value of the enable flag, and the state of the debug tags. //
   ///////////////////////////////////////////////////////////////////////
 
+  /// Print the log message without respect to whether the tag is enabled.
   void
   print(const char *tag, DiagsLevel level, const SourceLocation *loc, const char *fmt, ...) const TS_PRINTFLIKE(5, 6)
   {
@@ -180,6 +179,7 @@ public:
 
   void print_va(const char *tag, DiagsLevel level, const SourceLocation *loc, const char *fmt, va_list ap) const;
 
+  /// Print the log message only if tag is enabled.
   void
   log(const char *tag, DiagsLevel level, const SourceLocation *loc, const char *fmt, ...) const TS_PRINTFLIKE(5, 6)
   {
@@ -263,98 +263,3 @@ private:
     ink_mutex_release(&tag_table_lock);
   }
 };
-
-//////////////////////////////////////////////////////////////////////////
-//                                                                      //
-//      Macros                                                          //
-//                                                                      //
-//      The following are diagnostic macros that wrap up the compiler   //
-//      __FILE__, __FUNCTION__, and __LINE__ macros into closures       //
-//      and then invoke the closure on the remaining arguments.         //
-//                                                                      //
-//      This closure hack is done, because the cpp preprocessor doesn't //
-//      support manipulation and union of varargs parameters.           //
-//                                                                      //
-//////////////////////////////////////////////////////////////////////////
-
-#if !defined(__GNUC__)
-#ifndef __FUNCTION__
-#define __FUNCTION__ nullptr
-#endif
-#endif
-
-extern inkcoreapi Diags *diags;
-
-#define DiagsError(level, fmt, ...)                \
-  do {                                             \
-    SourceLocation loc = MakeSourceLocation();     \
-    diags->error(level, &loc, fmt, ##__VA_ARGS__); \
-  } while (0)
-
-#define Status(...) DiagsError(DL_Status, __VA_ARGS__)       // Log information
-#define Note(...) DiagsError(DL_Note, __VA_ARGS__)           // Log significant information
-#define Warning(...) DiagsError(DL_Warning, __VA_ARGS__)     // Log concerning information
-#define Error(...) DiagsError(DL_Error, __VA_ARGS__)         // Log operational failure, fail CI
-#define Fatal(...) DiagsError(DL_Fatal, __VA_ARGS__)         // Log recoverable crash, fail CI, exit & allow restart
-#define Alert(...) DiagsError(DL_Alert, __VA_ARGS__)         // Log recoverable crash, fail CI, exit & restart, Ops attention
-#define Emergency(...) DiagsError(DL_Emergency, __VA_ARGS__) // Log unrecoverable crash, fail CI, exit, Ops attention
-
-#define DiagsErrorV(level, fmt, ap)                  \
-  do {                                               \
-    const SourceLocation loc = MakeSourceLocation(); \
-    diags->error_va(level, &loc, fmt, ap);           \
-  } while (0)
-
-#define StatusV(fmt, ap) DiagsErrorV(DL_Status, fmt, ap)
-#define NoteV(fmt, ap) DiagsErrorV(DL_Note, fmt, ap)
-#define WarningV(fmt, ap) DiagsErrorV(DL_Warning, fmt, ap)
-#define ErrorV(fmt, ap) DiagsErrorV(DL_Error, fmt, ap)
-#define FatalV(fmt, ap) DiagsErrorV(DL_Fatal, fmt, ap)
-#define AlertV(fmt, ap) DiagsErrorV(DL_Alert, fmt, ap)
-#define EmergencyV(fmt, ap) DiagsErrorV(DL_Emergency, fmt, ap)
-
-#if TS_USE_DIAGS
-
-#define Diag(tag, ...)                                 \
-  do {                                                 \
-    if (unlikely(diags->on())) {                       \
-      const SourceLocation loc = MakeSourceLocation(); \
-      diags->log(tag, DL_Diag, &loc, __VA_ARGS__);     \
-    }                                                  \
-  } while (0)
-
-#define Debug(tag, ...)                                \
-  do {                                                 \
-    if (unlikely(diags->on())) {                       \
-      const SourceLocation loc = MakeSourceLocation(); \
-      diags->log(tag, DL_Debug, &loc, __VA_ARGS__);    \
-    }                                                  \
-  } while (0)
-
-#define SpecificDebug(flag, tag, ...)                                                                       \
-  do {                                                                                                      \
-    if (unlikely(diags->on())) {                                                                            \
-      const SourceLocation loc = MakeSourceLocation();                                                      \
-      flag ? diags->print(tag, DL_Debug, &loc, __VA_ARGS__) : diags->log(tag, DL_Debug, &loc, __VA_ARGS__); \
-    }                                                                                                       \
-  } while (0)
-
-#define is_debug_tag_set(_t) unlikely(diags->on(_t, DiagsTagType_Debug))
-#define is_action_tag_set(_t) unlikely(diags->on(_t, DiagsTagType_Action))
-#define debug_tag_assert(_t, _a) (is_debug_tag_set(_t) ? (ink_release_assert(_a), 0) : 0)
-#define action_tag_assert(_t, _a) (is_action_tag_set(_t) ? (ink_release_assert(_a), 0) : 0)
-#define is_diags_on(_t) unlikely(diags->on(_t))
-
-#else // TS_USE_DIAGS
-
-#define Diag(tag, fmt, ...)
-#define Debug(tag, fmt, ...)
-#define SpecificDebug(flag, tag, ...)
-
-#define is_debug_tag_set(_t) 0
-#define is_action_tag_set(_t) 0
-#define debug_tag_assert(_t, _a)  /**/
-#define action_tag_assert(_t, _a) /**/
-#define is_diags_on(_t) 0
-
-#endif // TS_USE_DIAGS
diff --git a/include/tscore/LogMessage.h b/include/tscore/LogMessage.h
new file mode 100644
index 0000000..69918e6
--- /dev/null
+++ b/include/tscore/LogMessage.h
@@ -0,0 +1,145 @@
+/** @file
+
+  LogMessage declaration.
+
+  @section license License
+
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+
+#pragma once
+
+#include "DiagsTypes.h"
+#include "SourceLocation.h"
+#include "Throttler.h"
+
+#include <atomic>
+#include <chrono>
+#include <functional>
+
+constexpr const bool IS_THROTTLED = true;
+
+/** A class implementing stateful logging behavior. */
+class LogMessage : public Throttler
+{
+public:
+  /** Create a LogMessage, optionally with throttling applied to it.
+   *
+   * If configured with throttling, the system's throttling value will be used
+   * and the throttling value will dynamically change as the user configures
+   * different values for throttling.
+   *
+   * @param[in] is_throttled Whether to apply throttling to the message. If
+   * true, the system default log throttling interval will be used.
+   */
+  LogMessage(bool is_throttled = false);
+
+  /** Create a LogMessage with an explicit throttling interval.
+   *
+   * For this message, throttling will be configured with the designated amount
+   * and will not change as the system's configured throttling interval
+   * changes.
+   *
+   * @param[in] throttling_interval The minimum number of desired
+   * milliseconds between log events. 0 implies no throttling.
+   */
+  LogMessage(std::chrono::milliseconds throttling_interval);
+
+  /* TODO: Add BufferWriter overloads for these. */
+  void diag(const char *tag, SourceLocation const &loc, const char *fmt, ...);
+  void debug(const char *tag, SourceLocation const &loc, const char *fmt, ...);
+  void status(SourceLocation const &loc, const char *fmt, ...);
+  void note(SourceLocation const &loc, const char *fmt, ...);
+  void warning(SourceLocation const &loc, const char *fmt, ...);
+  void error(SourceLocation const &loc, const char *fmt, ...);
+  void fatal(SourceLocation const &loc, const char *fmt, ...);
+  void alert(SourceLocation const &loc, const char *fmt, ...);
+  void emergency(SourceLocation const &loc, const char *fmt, ...);
+
+  void message(DiagsLevel level, SourceLocation const &loc, const char *fmt, ...);
+  void print(const char *tag, DiagsLevel level, SourceLocation const &loc, const char *fmt, ...);
+
+  void diag_va(const char *tag, SourceLocation const &loc, const char *fmt, va_list args);
+  void debug_va(const char *tag, SourceLocation const &loc, const char *fmt, va_list args);
+  void status_va(SourceLocation const &loc, const char *fmt, va_list args);
+  void note_va(SourceLocation const &loc, const char *fmt, va_list args);
+  void warning_va(SourceLocation const &loc, const char *fmt, va_list args);
+  void error_va(SourceLocation const &loc, const char *fmt, va_list args);
+  void fatal_va(SourceLocation const &loc, const char *fmt, va_list args);
+  void alert_va(SourceLocation const &loc, const char *fmt, va_list args);
+  void emergency_va(SourceLocation const &loc, const char *fmt, va_list args);
+  void message_va(DiagsLevel level, SourceLocation const &loc, const char *fmt, va_list args);
+
+  /** Set a new system-wide default log throttling interval.
+   *
+   * @param[in] new_interval The new log throttling interval.
+   */
+  static void set_default_log_throttling_interval(std::chrono::milliseconds new_interval);
+
+  /** Set a new system-wide default debug log throttling interval.
+   *
+   * @param[in] new_interval The new debug log throttling interval.
+   */
+  static void set_default_debug_throttling_interval(std::chrono::milliseconds new_interval);
+
+private:
+  using log_function_f = std::function<void(const char *fmt, va_list args)>;
+
+  /** Encapsulate common message handling logic in a helper function.
+   *
+   * @param[in] current_configured_interval The applicable log throttling
+   * interval for this message.
+   *
+   * @param[in] log_function The function to use to emit the log message if it
+   * is not throttled.
+   *
+   * @param[in] fmt The format string for the log message.
+   *
+   * @param[in] args The parameters for the above format string.
+   */
+  void message_helper(std::chrono::microseconds current_configured_interval, log_function_f log_function, const char *fmt,
+                      va_list args);
+
+  /** Message handling for non-debug logs. */
+  void standard_message_helper(DiagsLevel level, SourceLocation const &loc, const char *fmt, va_list args);
+
+  /** Same as above, but catered for the diag and debug variants.
+   *
+   * Note that this uses the diags-log variant which takes a debug tag.
+   */
+  void message_debug_helper(const char *tag, DiagsLevel level, SourceLocation const &loc, const char *fmt, va_list args);
+
+  /** Same as above, but uses the tag-ignoring diags->print variant. */
+  void message_print_helper(const char *tag, DiagsLevel level, SourceLocation const &loc, const char *fmt, va_list args);
+
+private:
+  /** Whether the throttling value was explicitly set by the user.
+   *
+   * If the user explicitly set a throttling value, then it will not change as
+   * the configured log throttling values change.
+   */
+  bool const _throttling_value_is_explicitly_set;
+
+  /** Whether throttling should be applied to this message. */
+  bool const _is_throttled;
+
+  /** The configured, system-wide default log throttling value. */
+  static std::atomic<std::chrono::milliseconds> _default_log_throttling_interval;
+
+  /** The configured, system-wide default debug log throttling value. */
+  static std::atomic<std::chrono::milliseconds> _default_debug_throttling_interval;
+};
diff --git a/include/tscore/Ptr.h b/include/tscore/Ptr.h
index 3e93477..23f0a17 100644
--- a/include/tscore/Ptr.h
+++ b/include/tscore/Ptr.h
@@ -99,10 +99,12 @@ template <class T> class Ptr
 public:
   explicit Ptr(T *p = nullptr);
   Ptr(const Ptr<T> &);
+  Ptr(Ptr<T> &&);
   ~Ptr();
 
   void clear();
   Ptr<T> &operator=(const Ptr<T> &);
+  Ptr<T> &operator=(Ptr<T> &&);
   Ptr<T> &operator=(T *);
 
   T *
@@ -209,6 +211,11 @@ template <class T> inline Ptr<T>::Ptr(const Ptr<T> &src) : m_ptr(src.m_ptr)
   }
 }
 
+template <class T> inline Ptr<T>::Ptr(Ptr<T> &&src) : m_ptr(src.m_ptr)
+{
+  src.m_ptr = nullptr;
+}
+
 template <class T> inline Ptr<T>::~Ptr()
 {
   if (m_ptr && m_ptr->refcount_dec() == 0) {
@@ -257,6 +264,18 @@ Ptr<T>::operator=(const Ptr<T> &src)
   return (operator=(src.m_ptr));
 }
 
+template <class T>
+inline Ptr<T> &
+Ptr<T>::operator=(Ptr<T> &&src)
+{
+  if (this != &src) {
+    this->~Ptr();
+    m_ptr     = src.m_ptr;
+    src.m_ptr = nullptr;
+  }
+  return *this;
+}
+
 // Bit of subtly here for the flipped version of equality checks
 // With only the template versions, the compiler will try to substitute @c nullptr_t
 // for @c T and fail, because that's not the type and no operator will be found.
diff --git a/include/tscore/Throttler.h b/include/tscore/Throttler.h
new file mode 100644
index 0000000..974861b
--- /dev/null
+++ b/include/tscore/Throttler.h
@@ -0,0 +1,124 @@
+/** @file
+
+  A class for generic throttling.
+
+  @section license License
+
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+
+#pragma once
+
+#include <atomic>
+#include <chrono>
+#include <cstdint>
+
+/** A class that exposes an interface for generic throttling of some action
+ * against a certain interval.
+ *
+ * To use:
+ *
+ * 1. Create an instance of this class specifying the interval for which
+ * something should be throttled. Alternatively, inherit from this class to
+ * have the throttling interface apply to the object you want throttling for.
+ *
+ * 2. Prepend each decision for a given throttled action with a call to
+ * is_throttled.
+ *
+ *   2a. If the is_throttled is false, then at least the configured number of
+ *   microseconds has elapsed since the previous call in which is_throttled
+ *   returned false. The number of times the check has been called between
+ *   these two times is provided in the suppressed_count output parameter.
+ *
+ *   2b. If is_throttled returns returns true, then not enough time has elapsed
+ *   since the last time the operation returned true per the throttling
+ *   interval. Thus the operation should be skipped or suppressed, depending
+ *   upon the context.
+ *
+ * For instance:
+ *
+ *    void foo()
+ *    {
+ *      using namespace std::chrono_literals;
+ *      static Throttler t(300ms);
+ *      uint64_t suppressed_count;
+ *      if (!t.is_throttled(suppressed_count)) {
+ *        std::printf("Alan bought another monitor\n");
+ *        std::printf("We ignored Alan buying a monitor %llu times\n", suppressed_count);
+ *      }
+ *    }
+ */
+class Throttler
+{
+public:
+  virtual ~Throttler() = default;
+
+  /**
+   * @param[in] interval The minimum number of microseconds between
+   * calls to Throttler which should return true.
+   */
+  Throttler(std::chrono::microseconds interval);
+
+  /** Whether the current event should be suppressed because the time since the
+   * last unsuppressed event is less than the throttling interval.
+   *
+   * @param[out] suppressed_count If the return of this call is false (the action
+   * should not be suppressed), this is populated with the approximate number
+   * of suppressed events between the last unsuppressed event and the current
+   * one.  Otherwise the value is not set. This value is approximate because,
+   * if used in a multithreaded context, other threads may be querrying against
+   * this function as well concurrently, and their count may not be applied
+   * depending upon the timing of their query.
+   *
+   * @return True if the action is suppressed per the configured interval,
+   * false otherwise.
+   */
+  virtual bool is_throttled(uint64_t &suppressed_count);
+
+  /** Set the log throttling interval to a new value.
+   *
+   * @param[in] interval The new interval to set.
+   */
+  virtual void set_throttling_interval(std::chrono::microseconds new_interval);
+
+  /** Manually reset the throttling counter to the current time.
+   *
+   * @return the number of messages skipped since the previous positive return
+   * of the functor operator.
+   */
+  virtual uint64_t reset_counter();
+
+private:
+  /// Base clock.
+  using Clock = std::chrono::system_clock;
+  /// Time point type, based on the clock to be used.
+  using TimePoint = Clock::time_point;
+
+  /// Time that the last item was emitted.
+  // It is strange that we need to explicitly default construct this with a
+  // default constructed TimePoint. Without it, however, I get a compiler error
+  // in gcc 8.x and 9.x.  Playing around in godbolt I notice that neither clang
+  // nor gcc versions starting from 10.x require this, so I suspect it is a
+  // compiler bug.
+  std::atomic<TimePoint> _last_allowed_time{TimePoint{}};
+
+  /// The minimum number of microseconds desired between actions.
+  std::atomic<std::chrono::microseconds> _interval{std::chrono::microseconds{0}};
+
+  /// The number of calls to Throttler since the last
+  uint64_t _suppressed_count = 0;
+};
diff --git a/iocore/dns/DNS.cc b/iocore/dns/DNS.cc
index 95a913b..a2a02bf 100644
--- a/iocore/dns/DNS.cc
+++ b/iocore/dns/DNS.cc
@@ -115,6 +115,10 @@ ink_get16(const uint8_t *src)
 static inline unsigned int
 get_rcode(char *buff)
 {
+  // 'buff' is always a HostEnt::buf which is a char array and therefore cannot
+  // be a nullptr. This assertion satisfies a mistaken clang-analyzer warning
+  // saying this can be a nullptr dereference.
+  ink_assert(buff != nullptr);
   return reinterpret_cast<HEADER *>(buff)->rcode;
 }
 
diff --git a/mgmt/RecordsConfig.cc b/mgmt/RecordsConfig.cc
index 556e0e1..52309ef 100644
--- a/mgmt/RecordsConfig.cc
+++ b/mgmt/RecordsConfig.cc
@@ -198,6 +198,8 @@ static const RecordElement RecordsConfig[] =
   ,
   {RECT_CONFIG, "proxy.config.diags.debug.tags", RECD_STRING, "http|dns", RECU_DYNAMIC, RR_NULL, RECC_NULL, nullptr, RECA_NULL}
   ,
+  {RECT_CONFIG, "proxy.config.diags.debug.throttling_interval_msec", RECD_INT, "0", RECU_DYNAMIC, RR_NULL, RECC_NULL, "^[0-9]+$", RECA_NULL}
+  ,
   {RECT_CONFIG, "proxy.config.diags.debug.client_ip", RECD_STRING, nullptr, RECU_DYNAMIC, RR_NULL, RECC_NULL, nullptr, RECA_NULL}
   ,
   {RECT_CONFIG, "proxy.config.diags.action.enabled", RECD_INT, "0", RECU_DYNAMIC, RR_NULL, RECC_NULL, nullptr, RECA_NULL}
@@ -1044,6 +1046,8 @@ static const RecordElement RecordsConfig[] =
   // How often periodic tasks get executed in the Log.cc infrastructure
   {RECT_CONFIG, "proxy.config.log.periodic_tasks_interval", RECD_INT, "5", RECU_DYNAMIC, RR_NULL, RECC_NULL, "^[0-9]+$", RECA_NULL}
   ,
+  {RECT_CONFIG, "proxy.config.log.throttling_interval_msec", RECD_INT, "60000", RECU_DYNAMIC, RR_NULL, RECC_NULL, "^[0-9]+$", RECA_NULL}
+  ,
 
   //##############################################################################
   //#
diff --git a/proxy/logging/Log.cc b/proxy/logging/Log.cc
index c5965e8..56c373d 100644
--- a/proxy/logging/Log.cc
+++ b/proxy/logging/Log.cc
@@ -1386,7 +1386,7 @@ Log::flush_thread_main(void * /* args ATS_UNUSED */)
       // make sure we're open & ready to write
       logfile->check_fd();
       if (!logfile->is_open()) {
-        Warning("File:%s was closed, have dropped (%d) bytes.", logfile->get_name(), total_bytes);
+        SiteThrottledWarning("File:%s was closed, have dropped (%d) bytes.", logfile->get_name(), total_bytes);
 
         RecIncrRawStat(log_rsb, mutex->thread_holding, log_stat_bytes_lost_before_written_to_disk_stat, total_bytes);
         delete fdata;
@@ -1412,8 +1412,8 @@ Log::flush_thread_main(void * /* args ATS_UNUSED */)
         len = ::write(logfilefd, &buf[bytes_written], total_bytes - bytes_written);
 
         if (len < 0) {
-          Error("Failed to write log to %s: [tried %d, wrote %d, %s]", logfile->get_name(), total_bytes - bytes_written,
-                bytes_written, strerror(errno));
+          SiteThrottledError("Failed to write log to %s: [tried %d, wrote %d, %s]", logfile->get_name(),
+                             total_bytes - bytes_written, bytes_written, strerror(errno));
 
           RecIncrRawStat(log_rsb, mutex->thread_holding, log_stat_bytes_lost_before_written_to_disk_stat,
                          total_bytes - bytes_written);
diff --git a/proxy/logging/Log.h b/proxy/logging/Log.h
index 50c4323..e0b502b 100644
--- a/proxy/logging/Log.h
+++ b/proxy/logging/Log.h
@@ -232,6 +232,12 @@ private:
 };
 
 static inline bool
+LogThrottlingIsValid(int throttling_val)
+{
+  return throttling_val >= 0;
+}
+
+static inline bool
 LogRollingEnabledIsValid(int enabled)
 {
   return (enabled >= Log::NO_ROLLING || enabled < Log::INVALID_ROLLING_VALUE);
diff --git a/proxy/logging/LogBuffer.cc b/proxy/logging/LogBuffer.cc
index 466e34f..8fdfb8c 100644
--- a/proxy/logging/LogBuffer.cc
+++ b/proxy/logging/LogBuffer.cc
@@ -504,7 +504,7 @@ LogBuffer::resolve_custom_entry(LogFieldList *fieldlist, char *printf_str, char
         res      = field->unmarshal(&read_from, to, write_to_len - bytes_written);
 
         if (res < 0) {
-          Note("%s", buffer_size_exceeded_msg);
+          SiteThrottledNote("%s", buffer_size_exceeded_msg);
           bytes_written = 0;
           break;
         }
@@ -529,7 +529,7 @@ LogBuffer::resolve_custom_entry(LogFieldList *fieldlist, char *printf_str, char
       if (1 + bytes_written < write_to_len) {
         write_to[bytes_written++] = printf_str[i];
       } else {
-        Note("%s", buffer_size_exceeded_msg);
+        SiteThrottledNote("%s", buffer_size_exceeded_msg);
         bytes_written = 0;
         break;
       }
diff --git a/proxy/logging/LogConfig.cc b/proxy/logging/LogConfig.cc
index 1a77f0d..7676a6e 100644
--- a/proxy/logging/LogConfig.cc
+++ b/proxy/logging/LogConfig.cc
@@ -33,8 +33,9 @@
 #include "tscore/ink_platform.h"
 #include "tscore/ink_file.h"
 
-#include "tscore/List.h"
 #include "tscore/Filenames.h"
+#include "tscore/List.h"
+#include "tscore/LogMessage.h"
 
 #include "Log.h"
 #include "LogField.h"
@@ -202,6 +203,20 @@ LogConfig::read_configuration_variables()
   val                 = static_cast<int>(REC_ConfigReadInteger("proxy.config.log.rolling_allow_empty"));
   rolling_allow_empty = (val > 0);
 
+  // THROTTLING
+  val = static_cast<int>(REC_ConfigReadInteger("proxy.config.log.throttling_interval_msec"));
+  if (LogThrottlingIsValid(val)) {
+    LogMessage::set_default_log_throttling_interval(std::chrono::milliseconds{val});
+  } else {
+    Warning("invalid value '%d' for '%s', disabling log rolling", val, "proxy.config.log.throttling_interval_msec");
+  }
+  val = static_cast<int>(REC_ConfigReadInteger("proxy.config.diags.debug.throttling_interval_msec"));
+  if (LogThrottlingIsValid(val)) {
+    LogMessage::set_default_debug_throttling_interval(std::chrono::milliseconds{val});
+  } else {
+    Warning("invalid value '%d' for '%s', disabling log rolling", val, "proxy.config.diags.debug.throttling_interval_msec");
+  }
+
   // Read in min_count control values for auto deletion
   if (auto_delete_rolled_files) {
     // The majority of register_rolled_log_auto_delete() updates come in
@@ -435,13 +450,27 @@ void
 LogConfig::register_config_callbacks()
 {
   static const char *names[] = {
-    "proxy.config.log.log_buffer_size",       "proxy.config.log.max_secs_per_buffer", "proxy.config.log.max_space_mb_for_logs",
-    "proxy.config.log.max_space_mb_headroom", "proxy.config.log.logfile_perm",        "proxy.config.log.hostname",
-    "proxy.config.log.logfile_dir",           "proxy.config.log.rolling_enabled",     "proxy.config.log.rolling_interval_sec",
-    "proxy.config.log.rolling_offset_hr",     "proxy.config.log.rolling_size_mb",     "proxy.config.log.auto_delete_rolled_files",
-    "proxy.config.log.rolling_max_count",     "proxy.config.log.rolling_allow_empty", "proxy.config.log.config.filename",
-    "proxy.config.log.sampling_frequency",    "proxy.config.log.file_stat_frequency", "proxy.config.log.space_used_frequency",
+    "proxy.config.log.log_buffer_size",
+    "proxy.config.log.max_secs_per_buffer",
+    "proxy.config.log.max_space_mb_for_logs",
+    "proxy.config.log.max_space_mb_headroom",
+    "proxy.config.log.logfile_perm",
+    "proxy.config.log.hostname",
+    "proxy.config.log.logfile_dir",
+    "proxy.config.log.rolling_enabled",
+    "proxy.config.log.rolling_interval_sec",
+    "proxy.config.log.rolling_offset_hr",
+    "proxy.config.log.rolling_size_mb",
+    "proxy.config.log.auto_delete_rolled_files",
+    "proxy.config.log.rolling_max_count",
+    "proxy.config.log.rolling_allow_empty",
+    "proxy.config.log.config.filename",
+    "proxy.config.log.sampling_frequency",
+    "proxy.config.log.file_stat_frequency",
+    "proxy.config.log.space_used_frequency",
     "proxy.config.log.io.max_buffer_index",
+    "proxy.config.log.throttling_interval_msec",
+    "proxy.config.diags.debug.throttling_interval_msec",
   };
 
   for (unsigned i = 0; i < countof(names); ++i) {
diff --git a/proxy/logging/LogFile.cc b/proxy/logging/LogFile.cc
index 8998776..7f5ca6c 100644
--- a/proxy/logging/LogFile.cc
+++ b/proxy/logging/LogFile.cc
@@ -718,7 +718,7 @@ LogFile::writeln(char *data, int len, int fd, const char *path)
     }
 
     if ((bytes_this_write = static_cast<int>(::writev(fd, (const struct iovec *)wvec, vcnt))) < 0) {
-      Warning("An error was encountered in writing to %s: %s.", ((path) ? path : "logfile"), strerror(errno));
+      SiteThrottledWarning("An error was encountered in writing to %s: %s.", ((path) ? path : "logfile"), strerror(errno));
     } else {
       total_bytes = bytes_this_write;
     }
diff --git a/proxy/logging/LogObject.cc b/proxy/logging/LogObject.cc
index 82230b0..9b8ec99 100644
--- a/proxy/logging/LogObject.cc
+++ b/proxy/logging/LogObject.cc
@@ -547,9 +547,9 @@ LogObject::log(LogAccess *lad, std::string_view text_entry)
   buffer = _checkout_write(&offset, bytes_needed);
 
   if (!buffer) {
-    Note("Skipping the current log entry for %s because its size (%zu) exceeds "
-         "the maximum payload space in a log buffer",
-         m_basename, bytes_needed);
+    SiteThrottledNote("Skipping the current log entry for %s because its size (%zu) exceeds "
+                      "the maximum payload space in a log buffer",
+                      m_basename, bytes_needed);
     return Log::FAIL;
   }
   //
diff --git a/proxy/logging/LogStandalone.cc b/proxy/logging/LogStandalone.cc
index b893cc5..620ea39 100644
--- a/proxy/logging/LogStandalone.cc
+++ b/proxy/logging/LogStandalone.cc
@@ -209,7 +209,6 @@ init_log_standalone(const char *pgm_name, bool one_copy)
   init_system(true);
   initialize_process_manager();
   diagsConfig = new DiagsConfig(pgm_name, logfile, error_tags, action_tags);
-  diags       = diagsConfig->diags;
 }
 
 /*-------------------------------------------------------------------------
@@ -237,7 +236,6 @@ init_log_standalone_basic(const char *pgm_name)
   init_system(false);
   const bool use_records = false;
   diagsConfig            = new DiagsConfig(pgm_name, logfile, error_tags, action_tags, use_records);
-  diags                  = diagsConfig->diags;
   // set stdin/stdout to be unbuffered
   //
   setbuf(stdin, nullptr);
diff --git a/proxy/shared/DiagsConfig.cc b/proxy/shared/DiagsConfig.cc
index 4301c01..305330c 100644
--- a/proxy/shared/DiagsConfig.cc
+++ b/proxy/shared/DiagsConfig.cc
@@ -67,8 +67,8 @@ DiagsConfig::reconfigure_diags()
   all_found = true;
 
   // initial value set to 0 or 1 based on command line tags
-  c.enabled[DiagsTagType_Debug]  = (diags->base_debug_tags != nullptr);
-  c.enabled[DiagsTagType_Action] = (diags->base_action_tags != nullptr);
+  c.enabled[DiagsTagType_Debug]  = (_diags->base_debug_tags != nullptr);
+  c.enabled[DiagsTagType_Action] = (_diags->base_action_tags != nullptr);
 
   // enabled if records.config set
 
@@ -84,9 +84,9 @@ DiagsConfig::reconfigure_diags()
   }
   all_found = all_found && found;
 
-  e                    = static_cast<int>(REC_readInteger("proxy.config.diags.show_location", &found));
-  diags->show_location = ((e == 1 && found) ? SHOW_LOCATION_DEBUG : ((e == 2 && found) ? SHOW_LOCATION_ALL : SHOW_LOCATION_NONE));
-  all_found            = all_found && found;
+  e                     = static_cast<int>(REC_readInteger("proxy.config.diags.show_location", &found));
+  _diags->show_location = ((e == 1 && found) ? SHOW_LOCATION_DEBUG : ((e == 2 && found) ? SHOW_LOCATION_ALL : SHOW_LOCATION_NONE));
+  all_found             = all_found && found;
 
   // read output routing values
   for (i = 0;; i++) {
@@ -128,23 +128,23 @@ DiagsConfig::reconfigure_diags()
     // clear out old tag tables //
     //////////////////////////////
 
-    diags->deactivate_all(DiagsTagType_Debug);
-    diags->deactivate_all(DiagsTagType_Action);
+    _diags->deactivate_all(DiagsTagType_Debug);
+    _diags->deactivate_all(DiagsTagType_Action);
 
     //////////////////////////////////////////////////////////////////////
     // add new tag tables from records.config or command line overrides //
     //////////////////////////////////////////////////////////////////////
 
-    diags->activate_taglist((diags->base_debug_tags ? diags->base_debug_tags : dt), DiagsTagType_Debug);
-    diags->activate_taglist((diags->base_action_tags ? diags->base_action_tags : at), DiagsTagType_Action);
+    _diags->activate_taglist((_diags->base_debug_tags ? _diags->base_debug_tags : dt), DiagsTagType_Debug);
+    _diags->activate_taglist((_diags->base_action_tags ? _diags->base_action_tags : at), DiagsTagType_Action);
 
 ////////////////////////////////////
 // change the diags config values //
 ////////////////////////////////////
 #if !defined(__GNUC__)
-    diags->config = c;
+    _diags->config = c;
 #else
-    memcpy(((void *)&diags->config), ((void *)&c), sizeof(DiagsConfigState));
+    memcpy(((void *)&_diags->config), ((void *)&c), sizeof(DiagsConfigState));
 #endif
     Note("updated diags config");
   }
@@ -172,7 +172,7 @@ diags_config_callback(const char * /* name ATS_UNUSED */, RecDataT /* data_type
   DiagsConfig *diagsConfig;
 
   diagsConfig = static_cast<DiagsConfig *>(opaque_token);
-  ink_assert(diags->magic == DIAGS_MAGIC);
+  ink_assert(::diags->magic == DIAGS_MAGIC);
   diagsConfig->reconfigure_diags();
   return (0);
 }
@@ -217,37 +217,37 @@ DiagsConfig::config_diags_norecords()
   //////////////////////////////
   // clear out old tag tables //
   //////////////////////////////
-  diags->deactivate_all(DiagsTagType_Debug);
-  diags->deactivate_all(DiagsTagType_Action);
+  _diags->deactivate_all(DiagsTagType_Debug);
+  _diags->deactivate_all(DiagsTagType_Action);
 
   //////////////////////////////////////////////////////////////////////
   // add new tag tables from command line overrides only              //
   //////////////////////////////////////////////////////////////////////
 
-  if (diags->base_debug_tags) {
-    diags->activate_taglist(diags->base_debug_tags, DiagsTagType_Debug);
+  if (_diags->base_debug_tags) {
+    _diags->activate_taglist(_diags->base_debug_tags, DiagsTagType_Debug);
     c.enabled[DiagsTagType_Debug] = true;
   } else {
     c.enabled[DiagsTagType_Debug] = false;
   }
 
-  if (diags->base_action_tags) {
-    diags->activate_taglist(diags->base_action_tags, DiagsTagType_Action);
+  if (_diags->base_action_tags) {
+    _diags->activate_taglist(_diags->base_action_tags, DiagsTagType_Action);
     c.enabled[DiagsTagType_Action] = true;
   } else {
     c.enabled[DiagsTagType_Action] = false;
   }
 
 #if !defined(__GNUC__)
-  diags->config = c;
+  _diags->config = c;
 #else
-  memcpy(((void *)&diags->config), ((void *)&c), sizeof(DiagsConfigState));
+  memcpy(((void *)&_diags->config), ((void *)&c), sizeof(DiagsConfigState));
 #endif
 }
 
 DiagsConfig::DiagsConfig(std::string_view prefix_string, const char *filename, const char *tags, const char *actions,
                          bool use_records)
-  : callbacks_established(false), diags_log(nullptr), diags(nullptr)
+  : callbacks_established(false), diags_log(nullptr), _diags(nullptr)
 {
   char diags_logpath[PATH_NAME_MAX];
   ats_scoped_str logpath;
@@ -259,7 +259,8 @@ DiagsConfig::DiagsConfig(std::string_view prefix_string, const char *filename, c
   ////////////////////////////////////////////////////////////////////
 
   if (!use_records) {
-    diags = new Diags(prefix_string, tags, actions, nullptr);
+    _diags  = new Diags(prefix_string, tags, actions, nullptr);
+    ::diags = _diags;
     config_diags_norecords();
     return;
   }
@@ -296,10 +297,11 @@ DiagsConfig::DiagsConfig(std::string_view prefix_string, const char *filename, c
 
   // Set up diags, FILE streams are opened in Diags constructor
   diags_log = new BaseLogFile(diags_logpath);
-  diags     = new Diags(prefix_string, tags, actions, diags_log, diags_perm_parsed, output_perm_parsed);
-  diags->config_roll_diagslog(static_cast<RollingEnabledValues>(diags_log_roll_enable), diags_log_roll_int, diags_log_roll_size);
-  diags->config_roll_outputlog(static_cast<RollingEnabledValues>(output_log_roll_enable), output_log_roll_int,
-                               output_log_roll_size);
+  _diags    = new Diags(prefix_string, tags, actions, diags_log, diags_perm_parsed, output_perm_parsed);
+  ::diags   = _diags;
+  _diags->config_roll_diagslog(static_cast<RollingEnabledValues>(diags_log_roll_enable), diags_log_roll_int, diags_log_roll_size);
+  _diags->config_roll_outputlog(static_cast<RollingEnabledValues>(output_log_roll_enable), output_log_roll_int,
+                                output_log_roll_size);
 
   Status("opened %s", diags_logpath);
 
@@ -354,5 +356,5 @@ DiagsConfig::register_diags_callbacks()
 
 DiagsConfig::~DiagsConfig()
 {
-  delete diags;
+  delete _diags;
 }
diff --git a/proxy/shared/DiagsConfig.h b/proxy/shared/DiagsConfig.h
index bccc071..d48d246 100644
--- a/proxy/shared/DiagsConfig.h
+++ b/proxy/shared/DiagsConfig.h
@@ -32,13 +32,16 @@ struct DiagsConfig {
   void parse_output_string(char *s, DiagsModeOutput *o);
   void register_diags_callbacks();
 
+  /** DiagsConfig constructor.
+   *
+   * As a side effect, this sets the global diags pointer to newly
+   * constructed _diags value.
+   */
   DiagsConfig(std::string_view prefix_string, const char *filename, const char *tags, const char *actions, bool use_records = true);
   ~DiagsConfig();
 
 private:
   bool callbacks_established;
   BaseLogFile *diags_log;
-
-public:
-  Diags *diags;
+  Diags *_diags;
 };
diff --git a/src/traffic_manager/traffic_manager.cc b/src/traffic_manager/traffic_manager.cc
index 4d4ea34..aa1d40a 100644
--- a/src/traffic_manager/traffic_manager.cc
+++ b/src/traffic_manager/traffic_manager.cc
@@ -546,7 +546,6 @@ main(int argc, const char **argv)
   // Bootstrap the Diags facility so that we can use it while starting
   //  up the manager
   diagsConfig = new DiagsConfig("Manager", DIAGS_LOG_FILENAME, debug_tags, action_tags, false);
-  diags       = diagsConfig->diags;
   diags->set_std_output(StdStream::STDOUT, bind_stdout);
   diags->set_std_output(StdStream::STDERR, bind_stderr);
 
@@ -590,13 +589,15 @@ main(int argc, const char **argv)
   RecLocalInitMessage();
   lmgmt->initAlarm();
 
-  if (diags) {
-    delete diagsConfig;
-  }
   // INKqa11968: need to set up callbacks and diags data structures
   // using configuration in records.config
-  diagsConfig = new DiagsConfig("Manager", DIAGS_LOG_FILENAME, debug_tags, action_tags, true);
-  diags       = diagsConfig->diags;
+  DiagsConfig *old_diagsconfig = diagsConfig;
+  diagsConfig                  = new DiagsConfig("Manager", DIAGS_LOG_FILENAME, debug_tags, action_tags, true);
+  if (old_diagsconfig) {
+    delete old_diagsconfig;
+    old_diagsconfig = nullptr;
+  }
+
   RecSetDiags(diags);
   diags->set_std_output(StdStream::STDOUT, bind_stdout);
   diags->set_std_output(StdStream::STDERR, bind_stderr);
diff --git a/src/traffic_server/traffic_server.cc b/src/traffic_server/traffic_server.cc
index 2a17303..884e25b 100644
--- a/src/traffic_server/traffic_server.cc
+++ b/src/traffic_server/traffic_server.cc
@@ -1778,7 +1778,6 @@ main(int /* argc ATS_UNUSED */, const char **argv)
   // This is also needed for log rotation - setting up the file can cause privilege
   // related errors and if diagsConfig isn't get up yet that will crash on a NULL pointer.
   diagsConfig = new DiagsConfig("Server", DIAGS_LOG_FILENAME, error_tags, action_tags, false);
-  diags       = diagsConfig->diags;
   diags->set_std_output(StdStream::STDOUT, bind_stdout);
   diags->set_std_output(StdStream::STDERR, bind_stderr);
   if (is_debug_tag_set("diags")) {
@@ -1873,7 +1872,6 @@ main(int /* argc ATS_UNUSED */, const char **argv)
   // Re-initialize diagsConfig based on records.config configuration
   DiagsConfig *old_log = diagsConfig;
   diagsConfig          = new DiagsConfig("Server", DIAGS_LOG_FILENAME, error_tags, action_tags, true);
-  diags                = diagsConfig->diags;
   RecSetDiags(diags);
   diags->set_std_output(StdStream::STDOUT, bind_stdout);
   diags->set_std_output(StdStream::STDERR, bind_stderr);
diff --git a/src/tscore/LogMessage.cc b/src/tscore/LogMessage.cc
new file mode 100644
index 0000000..91223ab
--- /dev/null
+++ b/src/tscore/LogMessage.cc
@@ -0,0 +1,275 @@
+/** @file
+
+  LogMessage implementation.
+
+  @section license License
+
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+
+#include "tscore/LogMessage.h"
+
+#include "tscore/Diags.h"
+
+using namespace std::chrono_literals;
+
+std::atomic<std::chrono::milliseconds> LogMessage::_default_log_throttling_interval{0ms};
+std::atomic<std::chrono::milliseconds> LogMessage::_default_debug_throttling_interval{0ms};
+
+// static
+void
+LogMessage::set_default_log_throttling_interval(std::chrono::milliseconds new_interval)
+{
+  _default_log_throttling_interval = new_interval;
+}
+
+// static
+void
+LogMessage::set_default_debug_throttling_interval(std::chrono::milliseconds new_interval)
+{
+  _default_debug_throttling_interval = new_interval;
+}
+
+void
+LogMessage::message_helper(std::chrono::microseconds current_configured_interval, log_function_f log_function, const char *fmt,
+                           va_list args)
+{
+  if (!_is_throttled) {
+    // If throttling is disabled, make this operation as efficient as possible.
+    // Simply log and exit without consulting the Throttler API.
+    //
+    // If the user changes the throttling value from some non-zero value to
+    // zero, then we may miss out on some "The following message was
+    // suppressed" logs. However we accept this as a tradeoff to make this
+    // common case as fast as possible.
+    log_function(fmt, args);
+    return;
+  }
+  if (!_throttling_value_is_explicitly_set) {
+    set_throttling_interval(current_configured_interval);
+  }
+  uint64_t number_of_suppressions = 0;
+  if (is_throttled(number_of_suppressions)) {
+    // The messages are the same and but we're still within the throttling
+    // interval. Suppress this message.
+    return;
+  }
+  // If we get here, the message should not be suppressed.
+  if (number_of_suppressions > 0) {
+    // We use no format parameters, so we just need an empty va_list.
+    va_list empty_args;
+    std::string message =
+      std::string("The following message was suppressed ") + std::to_string(number_of_suppressions) + std::string(" times.");
+    log_function(message.c_str(), empty_args);
+  }
+  log_function(fmt, args);
+}
+
+void
+LogMessage::standard_message_helper(DiagsLevel level, SourceLocation const &loc, const char *fmt, va_list args)
+{
+  message_helper(
+    _default_log_throttling_interval.load(),
+    [level, &loc](const char *fmt, va_list args) { diags->error_va(level, &loc, fmt, args); }, fmt, args);
+}
+
+void
+LogMessage::message_debug_helper(const char *tag, DiagsLevel level, SourceLocation const &loc, const char *fmt, va_list args)
+{
+  message_helper(
+    _default_debug_throttling_interval.load(),
+    [tag, level, &loc](const char *fmt, va_list args) { diags->log_va(tag, level, &loc, fmt, args); }, fmt, args);
+}
+
+void
+LogMessage::message_print_helper(const char *tag, DiagsLevel level, SourceLocation const &loc, const char *fmt, va_list args)
+{
+  message_helper(
+    _default_debug_throttling_interval.load(),
+    [tag, level, &loc](const char *fmt, va_list args) { diags->print_va(tag, level, &loc, fmt, args); }, fmt, args);
+}
+
+LogMessage::LogMessage(bool is_throttled)
+  // Turn throttling off by default. Each log event will check the configured
+  // throttling interval.
+  : Throttler{std::chrono::milliseconds{0}}, _throttling_value_is_explicitly_set{false}, _is_throttled{is_throttled}
+{
+}
+
+LogMessage::LogMessage(std::chrono::milliseconds throttling_interval)
+  : Throttler{throttling_interval}, _throttling_value_is_explicitly_set{true}, _is_throttled{throttling_interval != 0ms}
+{
+}
+
+void
+LogMessage::diag(const char *tag, SourceLocation const &loc, const char *fmt, ...)
+{
+  va_list args;
+  va_start(args, fmt);
+  message_debug_helper(tag, DL_Diag, loc, fmt, args);
+  va_end(args);
+}
+
+void
+LogMessage::debug(const char *tag, SourceLocation const &loc, const char *fmt, ...)
+{
+  va_list args;
+  va_start(args, fmt);
+  message_debug_helper(tag, DL_Debug, loc, fmt, args);
+  va_end(args);
+}
+
+void
+LogMessage::status(SourceLocation const &loc, const char *fmt, ...)
+{
+  va_list args;
+  va_start(args, fmt);
+  standard_message_helper(DL_Status, loc, fmt, args);
+  va_end(args);
+}
+
+void
+LogMessage::note(SourceLocation const &loc, const char *fmt, ...)
+{
+  va_list args;
+  va_start(args, fmt);
+  standard_message_helper(DL_Note, loc, fmt, args);
+  va_end(args);
+}
+
+void
+LogMessage::warning(SourceLocation const &loc, const char *fmt, ...)
+{
+  va_list args;
+  va_start(args, fmt);
+  standard_message_helper(DL_Warning, loc, fmt, args);
+  va_end(args);
+}
+
+void
+LogMessage::error(SourceLocation const &loc, const char *fmt, ...)
+{
+  va_list args;
+  va_start(args, fmt);
+  standard_message_helper(DL_Error, loc, fmt, args);
+  va_end(args);
+}
+
+void
+LogMessage::fatal(SourceLocation const &loc, const char *fmt, ...)
+{
+  va_list args;
+  va_start(args, fmt);
+  standard_message_helper(DL_Fatal, loc, fmt, args);
+  va_end(args);
+}
+
+void
+LogMessage::alert(SourceLocation const &loc, const char *fmt, ...)
+{
+  va_list args;
+  va_start(args, fmt);
+  standard_message_helper(DL_Alert, loc, fmt, args);
+  va_end(args);
+}
+
+void
+LogMessage::emergency(SourceLocation const &loc, const char *fmt, ...)
+{
+  va_list args;
+  va_start(args, fmt);
+  standard_message_helper(DL_Emergency, loc, fmt, args);
+  va_end(args);
+}
+
+void
+LogMessage::message(DiagsLevel level, SourceLocation const &loc, const char *fmt, ...)
+{
+  va_list args;
+  va_start(args, fmt);
+  standard_message_helper(level, loc, fmt, args);
+  va_end(args);
+}
+
+void
+LogMessage::print(const char *tag, DiagsLevel level, SourceLocation const &loc, const char *fmt, ...)
+{
+  va_list args;
+  va_start(args, fmt);
+  message_print_helper(tag, level, loc, fmt, args);
+  va_end(args);
+}
+
+void
+LogMessage::diag_va(const char *tag, SourceLocation const &loc, const char *fmt, va_list args)
+{
+  message_debug_helper(tag, DL_Diag, loc, fmt, args);
+}
+
+void
+LogMessage::debug_va(const char *tag, SourceLocation const &loc, const char *fmt, va_list args)
+{
+  message_debug_helper(tag, DL_Debug, loc, fmt, args);
+}
+
+void
+LogMessage::status_va(SourceLocation const &loc, const char *fmt, va_list args)
+{
+  standard_message_helper(DL_Status, loc, fmt, args);
+}
+
+void
+LogMessage::note_va(SourceLocation const &loc, const char *fmt, va_list args)
+{
+  standard_message_helper(DL_Note, loc, fmt, args);
+}
+
+void
+LogMessage::warning_va(SourceLocation const &loc, const char *fmt, va_list args)
+{
+  standard_message_helper(DL_Warning, loc, fmt, args);
+}
+
+void
+LogMessage::error_va(SourceLocation const &loc, const char *fmt, va_list args)
+{
+  standard_message_helper(DL_Error, loc, fmt, args);
+}
+
+void
+LogMessage::fatal_va(SourceLocation const &loc, const char *fmt, va_list args)
+{
+  standard_message_helper(DL_Fatal, loc, fmt, args);
+}
+
+void
+LogMessage::alert_va(SourceLocation const &loc, const char *fmt, va_list args)
+{
+  standard_message_helper(DL_Alert, loc, fmt, args);
+}
+
+void
+LogMessage::emergency_va(SourceLocation const &loc, const char *fmt, va_list args)
+{
+  standard_message_helper(DL_Emergency, loc, fmt, args);
+}
+
+void
+LogMessage::message_va(DiagsLevel level, SourceLocation const &loc, const char *fmt, va_list args)
+{
+  standard_message_helper(level, loc, fmt, args);
+}
diff --git a/src/tscore/Makefile.am b/src/tscore/Makefile.am
index ed6788e..8d53dbf 100644
--- a/src/tscore/Makefile.am
+++ b/src/tscore/Makefile.am
@@ -117,6 +117,8 @@ libtscore_la_SOURCES = \
 	signals.cc \
 	SourceLocation.cc \
 	TextBuffer.cc \
+	LogMessage.cc \
+	Throttler.cc \
 	Tokenizer.cc \
 	ts_file.cc \
 	Version.cc \
@@ -177,12 +179,13 @@ test_tscore_SOURCES = \
 	unit_tests/test_List.cc \
 	unit_tests/test_MemArena.cc \
 	unit_tests/test_MT_hashtable.cc \
-  unit_tests/test_ParseRules.cc \
+	unit_tests/test_ParseRules.cc \
 	unit_tests/test_PriorityQueue.cc \
 	unit_tests/test_Ptr.cc \
 	unit_tests/test_Regex.cc \
 	unit_tests/test_Scalar.cc \
 	unit_tests/test_scoped_resource.cc \
+	unit_tests/test_Throttler.cc \
 	unit_tests/test_Tokenizer.cc \
 	unit_tests/test_ts_file.cc \
 	unit_tests/test_Version.cc \
diff --git a/src/tscore/Throttler.cc b/src/tscore/Throttler.cc
new file mode 100644
index 0000000..99ce38c
--- /dev/null
+++ b/src/tscore/Throttler.cc
@@ -0,0 +1,57 @@
+/** @file
+
+  Implement Throttler.
+
+  @section license License
+
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+
+#include "tscore/Throttler.h"
+
+Throttler::Throttler(std::chrono::microseconds interval) : _interval{interval} {}
+
+bool
+Throttler::is_throttled(uint64_t &skipped_count)
+{
+  TimePoint const now = Clock::now();
+  TimePoint last_allowed_time{_last_allowed_time};
+  if ((last_allowed_time + _interval.load()) <= now) {
+    if (_last_allowed_time.compare_exchange_strong(last_allowed_time, now)) {
+      skipped_count     = _suppressed_count;
+      _suppressed_count = 0;
+      return false;
+    }
+  }
+  ++_suppressed_count;
+  return true;
+}
+
+uint64_t
+Throttler::reset_counter()
+{
+  _last_allowed_time       = Clock::now();
+  auto const skipped_count = _suppressed_count;
+  _suppressed_count        = 0;
+  return skipped_count;
+}
+
+void
+Throttler::set_throttling_interval(std::chrono::microseconds new_interval)
+{
+  _interval = new_interval;
+}
diff --git a/src/tscore/unit_tests/test_Throttler.cc b/src/tscore/unit_tests/test_Throttler.cc
new file mode 100644
index 0000000..5dba843
--- /dev/null
+++ b/src/tscore/unit_tests/test_Throttler.cc
@@ -0,0 +1,55 @@
+/**
+  @file Test for Regex.cc
+
+  @section license License
+
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+*/
+
+#include "tscore/Throttler.h"
+#include "catch.hpp"
+
+#include <chrono>
+#include <thread>
+
+using namespace std::literals;
+
+TEST_CASE("Throttler", "[libts][Throttler]")
+{
+  auto const periodicity = 100ms;
+  Throttler throttler(periodicity);
+  uint64_t skipped_count = 0;
+
+  // The first check should be allowed.
+  CHECK_FALSE(throttler.is_throttled(skipped_count));
+
+  // The first time this is called, none were skipped.
+  CHECK(skipped_count == 0);
+
+  // In rapid succession, do a few more that should be skipped.
+  auto const expected_skip_count = 5u;
+  for (auto i = 0u; i < expected_skip_count; ++i) {
+    CHECK(throttler.is_throttled(skipped_count));
+  }
+
+  // Sleep more than enough time for the throttler to allow the following
+  // check.
+  std::this_thread::sleep_for(2 * periodicity);
+
+  CHECK_FALSE(throttler.is_throttled(skipped_count));
+  CHECK(skipped_count == expected_skip_count);
+}