You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by zw...@apache.org on 2019/12/22 20:12:57 UTC

[trafficserver] 02/02: auto delete rolled log file fixes

This is an automated email from the ASF dual-hosted git repository.

zwoop pushed a commit to branch 9.0.x
in repository https://gitbox.apache.org/repos/asf/trafficserver.git

commit d810d66a8f8c20d074d91bd2d0855e888b2cf553
Author: bneradt <bn...@verizonmedia.com>
AuthorDate: Mon Nov 25 23:09:14 2019 +0000

    auto delete rolled log file fixes
    
    This fixes auto delete registration for:
    
    - core log files that were not getting deleted (such as error.log and
      manager.log) as well as...
    - plugin log files.
    
    To make log deletion registration more automatic, I placed it in LogObject
    initialization so that it doesn't have to happen in as many disparate places.
    By doing this, plugin log auto delete comes for free.  This is why nothing in
    the core plugin implementation needed to be changed to register their log
    files, for instance. Not all log objects are managed by LogObject, however, so
    we still have some manual calls to registration in LogConfig for core log files
    (such as traffic.out and diags.log, for example).
    
    Before this change, LogConfig manipulated deleting_info to create and maintain
    candidates. To encapsulate this logic, I created RolledLogDeleter and changed
    LogConfig to interact with an instance of that. This simplified the LogConfig
    logic while also enabling unit testability of the main log deletion feature.
    
    This also:
    
    - Adds a rolling_max_count autest.
    - Fixes a shutdown leak concerning LogDeletingInfo.
    - Fixes a paramater mismatch between TextLogObject and LogObject.
    
    (cherry picked from commit fa6e5731762ed41c6f31aa89b18579362302d326)
---
 include/tscore/Diags.h                            |   4 +-
 include/tscore/ts_file.h                          |   3 +
 iocore/cache/test/main.cc                         |   2 +-
 iocore/net/quic/test/event_processor_main.cc      |   2 +-
 iocore/net/quic/test/main.cc                      |   2 +-
 proxy/http3/test/main.cc                          |   2 +-
 proxy/http3/test/main_qpack.cc                    |   2 +-
 proxy/logging/Log.cc                              |   8 +-
 proxy/logging/LogConfig.cc                        | 108 +++--
 proxy/logging/LogConfig.h                         |  85 +---
 proxy/logging/LogObject.cc                        |  16 +-
 proxy/logging/LogObject.h                         |  10 +-
 proxy/logging/Makefile.am                         |  29 +-
 proxy/logging/RolledLogDeleter.cc                 | 135 ++++++
 proxy/logging/RolledLogDeleter.h                  | 214 ++++++++++
 proxy/logging/YamlLogConfig.cc                    |  26 +-
 proxy/logging/unit-tests/test_LogUtils.h          |   2 +
 proxy/logging/unit-tests/test_RolledLogDeleter.cc | 319 +++++++++++++++
 proxy/shared/DiagsConfig.cc                       |   3 +-
 proxy/shared/DiagsConfig.h                        |   2 +-
 src/traffic_server/InkAPI.cc                      |   2 +-
 src/tscore/Diags.cc                               |   9 +-
 tests/gold_tests/logging/log_retention.test.py    | 477 +++++++++++++++++++---
 tests/tools/plugins/test_log_interface.cc         |  97 +++++
 24 files changed, 1331 insertions(+), 228 deletions(-)

diff --git a/include/tscore/Diags.h b/include/tscore/Diags.h
index a6274c6..23c6696 100644
--- a/include/tscore/Diags.h
+++ b/include/tscore/Diags.h
@@ -111,7 +111,7 @@ struct DiagsConfigState {
 class Diags
 {
 public:
-  Diags(const char *prefix_string, const char *base_debug_tags, const char *base_action_tags, BaseLogFile *_diags_log,
+  Diags(std::string_view prefix_string, const char *base_debug_tags, const char *base_action_tags, BaseLogFile *_diags_log,
         int diags_log_perm = -1, int output_log_perm = -1);
   virtual ~Diags();
 
@@ -230,7 +230,7 @@ public:
   IpAddr debug_client_ip;
 
 private:
-  const char *prefix_str;
+  const std::string prefix_str;
   mutable ink_mutex tag_table_lock; // prevents reconfig/read races
   DFA *activated_tags[2];           // 1 table for debug, 1 for action
 
diff --git a/include/tscore/ts_file.h b/include/tscore/ts_file.h
index c058c00..c4389e9 100644
--- a/include/tscore/ts_file.h
+++ b/include/tscore/ts_file.h
@@ -180,6 +180,9 @@ namespace file
 
   // Return the filename derived from path p.
   //
+  // This is made to match the std::filesystem::path::filename behavior:
+  //   https://en.cppreference.com/w/cpp/filesystem/path/filename
+  //
   // Examples:
   //   given "/foo/bar.txt", this returns "bar.txt"
   //   given "/foo/bar", this returns "bar"
diff --git a/iocore/cache/test/main.cc b/iocore/cache/test/main.cc
index 6e19960..7c57a0c 100644
--- a/iocore/cache/test/main.cc
+++ b/iocore/cache/test/main.cc
@@ -42,7 +42,7 @@ struct EventProcessorListener : Catch::TestEventListenerBase {
   testRunStarting(Catch::TestRunInfo const &testRunInfo) override
   {
     BaseLogFile *base_log_file = new BaseLogFile("stderr");
-    diags                      = new Diags(testRunInfo.name.c_str(), "*" /* tags */, "" /* actions */, base_log_file);
+    diags                      = new Diags(testRunInfo.name, "*" /* tags */, "" /* actions */, base_log_file);
     diags->activate_taglist("cache.*|agg.*|locks", DiagsTagType_Debug);
     diags->config.enabled[DiagsTagType_Debug] = true;
     diags->show_location                      = SHOW_LOCATION_DEBUG;
diff --git a/iocore/net/quic/test/event_processor_main.cc b/iocore/net/quic/test/event_processor_main.cc
index 4c1d2dd..8963e9f 100644
--- a/iocore/net/quic/test/event_processor_main.cc
+++ b/iocore/net/quic/test/event_processor_main.cc
@@ -43,7 +43,7 @@ struct EventProcessorListener : Catch::TestEventListenerBase {
   testRunStarting(Catch::TestRunInfo const &testRunInfo) override
   {
     BaseLogFile *base_log_file = new BaseLogFile("stderr");
-    diags                      = new Diags(testRunInfo.name.c_str(), "" /* tags */, "" /* actions */, base_log_file);
+    diags                      = new Diags(testRunInfo.name, "" /* tags */, "" /* actions */, base_log_file);
     diags->activate_taglist("vv_quic|quic", DiagsTagType_Debug);
     diags->config.enabled[DiagsTagType_Debug] = true;
     diags->show_location                      = SHOW_LOCATION_DEBUG;
diff --git a/iocore/net/quic/test/main.cc b/iocore/net/quic/test/main.cc
index 96767a2..eb93ea7 100644
--- a/iocore/net/quic/test/main.cc
+++ b/iocore/net/quic/test/main.cc
@@ -39,7 +39,7 @@ struct EventProcessorListener : Catch::TestEventListenerBase {
   testRunStarting(Catch::TestRunInfo const &testRunInfo) override
   {
     BaseLogFile *base_log_file = new BaseLogFile("stderr");
-    diags                      = new Diags(testRunInfo.name.c_str(), "" /* tags */, "" /* actions */, base_log_file);
+    diags                      = new Diags(testRunInfo.name, "" /* tags */, "" /* actions */, base_log_file);
     diags->activate_taglist("vv_quic|quic", DiagsTagType_Debug);
     diags->config.enabled[DiagsTagType_Debug] = true;
     diags->show_location                      = SHOW_LOCATION_DEBUG;
diff --git a/proxy/http3/test/main.cc b/proxy/http3/test/main.cc
index 002e9d6..247e118 100644
--- a/proxy/http3/test/main.cc
+++ b/proxy/http3/test/main.cc
@@ -39,7 +39,7 @@ struct EventProcessorListener : Catch::TestEventListenerBase {
   testRunStarting(Catch::TestRunInfo const &testRunInfo) override
   {
     BaseLogFile *base_log_file = new BaseLogFile("stderr");
-    diags                      = new Diags(testRunInfo.name.c_str(), "" /* tags */, "" /* actions */, base_log_file);
+    diags                      = new Diags(testRunInfo.name, "" /* tags */, "" /* actions */, base_log_file);
     diags->activate_taglist("vv_quic|quic", DiagsTagType_Debug);
     diags->config.enabled[DiagsTagType_Debug] = true;
     diags->show_location                      = SHOW_LOCATION_DEBUG;
diff --git a/proxy/http3/test/main_qpack.cc b/proxy/http3/test/main_qpack.cc
index d062637..9bac453 100644
--- a/proxy/http3/test/main_qpack.cc
+++ b/proxy/http3/test/main_qpack.cc
@@ -56,7 +56,7 @@ struct EventProcessorListener : Catch::TestEventListenerBase {
   testRunStarting(Catch::TestRunInfo const &testRunInfo) override
   {
     BaseLogFile *base_log_file = new BaseLogFile("stderr");
-    diags                      = new Diags(testRunInfo.name.c_str(), "" /* tags */, "" /* actions */, base_log_file);
+    diags                      = new Diags(testRunInfo.name, "" /* tags */, "" /* actions */, base_log_file);
     diags->activate_taglist("qpack", DiagsTagType_Debug);
     diags->config.enabled[DiagsTagType_Debug] = true;
     diags->show_location                      = SHOW_LOCATION_DEBUG;
diff --git a/proxy/logging/Log.cc b/proxy/logging/Log.cc
index 60a9460..3584cb0 100644
--- a/proxy/logging/Log.cc
+++ b/proxy/logging/Log.cc
@@ -1015,10 +1015,10 @@ Log::init_when_enabled()
     // setup global scrap object
     //
     global_scrap_format = MakeTextLogFormat();
-    global_scrap_object =
-      new LogObject(global_scrap_format, Log::config->logfile_dir, "scrapfile.log", LOG_FILE_BINARY, nullptr,
-                    Log::config->rolling_enabled, Log::config->preproc_threads, Log::config->rolling_interval_sec,
-                    Log::config->rolling_offset_hr, Log::config->rolling_size_mb);
+    global_scrap_object = new LogObject(
+      global_scrap_format, Log::config->logfile_dir, "scrapfile.log", LOG_FILE_BINARY, nullptr, Log::config->rolling_enabled,
+      Log::config->preproc_threads, Log::config->rolling_interval_sec, Log::config->rolling_offset_hr, Log::config->rolling_size_mb,
+      /* auto create */ false, Log::config->rolling_max_count, Log::config->rolling_min_count);
 
     // create the flush thread
     create_threads();
diff --git a/proxy/logging/LogConfig.cc b/proxy/logging/LogConfig.cc
index 77e8926..41371f5 100644
--- a/proxy/logging/LogConfig.cc
+++ b/proxy/logging/LogConfig.cc
@@ -62,6 +62,7 @@
 
 #define PARTITION_HEADROOM_MB 10
 #define DIAGS_LOG_FILENAME "diags.log"
+#define MANAGER_LOG_FILENAME "manager.log"
 
 void
 LogConfig::setup_default_values()
@@ -106,6 +107,19 @@ void LogConfig::reconfigure_mgmt_variables(ts::MemSpan<void>)
 }
 
 void
+LogConfig::register_rolled_log_auto_delete(std::string_view logname, int rolling_min_count)
+{
+  if (!auto_delete_rolled_files) {
+    // Nothing to do if auto-deletion is not configured.
+    return;
+  }
+
+  Debug("logspace", "Registering rotated log deletion for %s with min roll count %d", std::string(logname).c_str(),
+        rolling_min_count);
+  rolledLogDeleter.register_log_type_for_deletion(logname, rolling_min_count);
+}
+
+void
 LogConfig::read_configuration_variables()
 {
   int val;
@@ -184,20 +198,20 @@ LogConfig::read_configuration_variables()
 
   // Read in min_count control values for auto deletion
   if (auto_delete_rolled_files) {
+    // The majority of register_rolled_log_auto_delete() updates come in
+    // through LogObject. However, not all ATS logs are managed by LogObject.
+    // The following register these other core logs for log rotation deletion.
+
     // For diagnostic logs
     val = static_cast<int>(REC_ConfigReadInteger("proxy.config.diags.logfile.rolling_min_count"));
-    val = ((val == 0) ? INT_MAX : val);
-    deleting_info.insert(new LogDeletingInfo(DIAGS_LOG_FILENAME, val));
+    register_rolled_log_auto_delete(DIAGS_LOG_FILENAME, val);
+    register_rolled_log_auto_delete(MANAGER_LOG_FILENAME, val);
 
     // For traffic.out
-    ats_scoped_str name(REC_ConfigReadString("proxy.config.output.logfile"));
-    val = static_cast<int>(REC_ConfigReadInteger("proxy.config.output.logfile.rolling_min_count"));
-    val = ((val == 0) ? INT_MAX : val);
-    if (name) {
-      deleting_info.insert(new LogDeletingInfo(name.get(), val));
-    } else {
-      deleting_info.insert(new LogDeletingInfo("traffic.out", val));
-    }
+    const char *configured_name(REC_ConfigReadString("proxy.config.output.logfile"));
+    const char *traffic_logname = configured_name ? configured_name : "traffic.out";
+    val                         = static_cast<int>(REC_ConfigReadInteger("proxy.config.output.logfile.rolling_min_count"));
+    register_rolled_log_auto_delete(traffic_logname, val);
 
     rolling_max_count = static_cast<int>(REC_ConfigReadInteger("proxy.config.log.rolling_max_count"));
   }
@@ -285,7 +299,8 @@ LogConfig::init(LogConfig *prev_config)
     Debug("log", "creating predefined error log object");
 
     errlog = new LogObject(fmt.get(), logfile_dir, "error.log", LOG_FILE_ASCII, nullptr, rolling_enabled, preproc_threads,
-                           rolling_interval_sec, rolling_offset_hr, rolling_size_mb);
+                           rolling_interval_sec, rolling_offset_hr, rolling_size_mb, /* auto_created */ false, rolling_max_count,
+                           rolling_min_count);
 
     log_object_manager.manage_object(errlog);
     errlog->set_fmt_timestamps();
@@ -561,7 +576,6 @@ LogConfig::update_space_used()
     return;
   }
 
-  int candidate_count;
   int64_t total_space_used, partition_space_left;
   char path[MAXPATHLEN];
   int sret;
@@ -603,7 +617,6 @@ LogConfig::update_space_used()
   }
 
   total_space_used = 0LL;
-  candidate_count  = 0;
 
   while ((entry = readdir(ld))) {
     snprintf(path, MAXPATHLEN, "%s/%s", logfile_dir, entry->d_name);
@@ -613,18 +626,7 @@ LogConfig::update_space_used()
       total_space_used += static_cast<int64_t>(sbuf.st_size);
 
       if (auto_delete_rolled_files && LogFile::rolled_logfile(entry->d_name)) {
-        //
-        // then check if the candidate belongs to any given log type
-        //
-        auto iter = deleting_info.find(LogUtils::get_unrolled_filename(entry->d_name));
-        if (iter == deleting_info.end()) {
-          // We won't delete the log if its name doesn't match any give type.
-          continue;
-        }
-
-        auto &candidates = iter->candidates;
-        candidates.push_back(LogDeleteCandidate(path, static_cast<int64_t>(sbuf.st_size), sbuf.st_mtime));
-        candidate_count++;
+        rolledLogDeleter.consider_for_candidacy(path, sbuf.st_size, sbuf.st_mtime);
       }
     }
   }
@@ -667,62 +669,50 @@ LogConfig::update_space_used()
   int64_t max_space = static_cast<int64_t>(get_max_space_mb()) * LOG_MEGABYTE;
   int64_t headroom  = static_cast<int64_t>(max_space_mb_headroom) * LOG_MEGABYTE;
 
-  if (candidate_count > 0 && !space_to_write(headroom)) {
+  if (!space_to_write(headroom)) {
     Debug("logspace", "headroom reached, trying to clear space ...");
-    Debug("logspace", "sorting %d delete candidates ...", candidate_count);
-
-    deleting_info.apply([](LogDeletingInfo &info) {
-      std::sort(info.candidates.begin(), info.candidates.end(),
-                [](LogDeleteCandidate const &a, LogDeleteCandidate const &b) { return a.mtime > b.mtime; });
-    });
+    if (!rolledLogDeleter.has_candidates()) {
+      Note("Cannot clear space because there are no recognized Traffic Server rolled logs for auto deletion.");
+    } else {
+      Debug("logspace", "Considering %zu delete candidates ...", rolledLogDeleter.get_candidate_count());
+    }
 
-    while (candidate_count > 0) {
+    while (rolledLogDeleter.has_candidates()) {
       if (space_to_write(headroom + log_buffer_size)) {
         Debug("logspace", "low water mark reached; stop deleting");
         break;
       }
 
-      // Select the group with biggest ratio
-      auto target =
-        std::max_element(deleting_info.begin(), deleting_info.end(), [](LogDeletingInfo const &A, LogDeletingInfo const &B) {
-          return static_cast<double>(A.candidates.size()) / A.min_count < static_cast<double>(B.candidates.size()) / B.min_count;
-        });
-
-      auto &candidates = target->candidates;
-
+      auto victim = rolledLogDeleter.take_next_candidate_to_delete();
       // Check if any candidate exists
-      if (candidates.empty()) {
+      if (!victim) {
         // This shouldn't be triggered unless min_count are configured wrong or extra non-log files occupy the directory
-        Debug("logspace", "No more victims for log type %s. Check your rolling_min_count settings and logging directory.",
-              target->name.c_str());
+        Debug("logspace", "No more victims. Check your rolling_min_count settings and logging directory.");
       } else {
-        auto &victim = candidates.back();
-        Debug("logspace", "auto-deleting %s", victim.name.c_str());
+        Debug("logspace", "auto-deleting %s", victim->rolled_log_path.c_str());
 
-        if (unlink(victim.name.c_str()) < 0) {
-          Note("Traffic Server was Unable to auto-delete rolled "
+        if (unlink(victim->rolled_log_path.c_str()) < 0) {
+          Note("Traffic Server was unable to auto-delete rolled "
                "logfile %s: %s.",
-               victim.name.c_str(), strerror(errno));
+               victim->rolled_log_path.c_str(), strerror(errno));
         } else {
           Debug("logspace",
                 "The rolled logfile, %s, was auto-deleted; "
                 "%" PRId64 " bytes were reclaimed.",
-                victim.name.c_str(), victim.size);
+                victim->rolled_log_path.c_str(), victim->size);
 
           // Update after successful unlink;
-          m_space_used -= victim.size;
-          m_partition_space_left += victim.size;
+          m_space_used -= victim->size;
+          m_partition_space_left += victim->size;
         }
-        // Update total candidates and remove victim
-        --candidate_count;
-        candidates.pop_back();
       }
     }
   }
-  //
-  // Clean up the candidate array
-  //
-  deleting_info.apply([](LogDeletingInfo &info) { info.clear(); });
+
+  // The set of files in the logs dir may change between iterations to check
+  // for logs to delete. To deal with this, we simply clear our internal
+  // candidates metadata and regenerate it on each iteration.
+  rolledLogDeleter.clear_candidates();
 
   //
   // Now that we've updated the m_space_used value, see if we need to
diff --git a/proxy/logging/LogConfig.h b/proxy/logging/LogConfig.h
index 6d69cdf..0f5eac7 100644
--- a/proxy/logging/LogConfig.h
+++ b/proxy/logging/LogConfig.h
@@ -26,11 +26,11 @@
 #include <string_view>
 #include <string>
 
-#include "tscore/IntrusiveHashMap.h"
 #include "tscore/ink_platform.h"
 #include "records/P_RecProcess.h"
 #include "ProxyConfig.h"
 #include "LogObject.h"
+#include "RolledLogDeleter.h"
 #include "tscpp/util/MemSpan.h"
 
 /* Instead of enumerating the stats in DynamicStats.h, each module needs
@@ -79,73 +79,6 @@ extern RecRawStatBlock *log_rsb;
 struct dirent;
 
 /*-------------------------------------------------------------------------
-  LogDeleteCandidate, LogDeletingInfo&Descriptor
-  -------------------------------------------------------------------------*/
-
-struct LogDeleteCandidate {
-  std::string name;
-  int64_t size;
-  time_t mtime;
-
-  LogDeleteCandidate(char *p_name, int64_t st_size, time_t st_time) : name(p_name), size(st_size), mtime(st_time) {}
-};
-
-struct LogDeletingInfo {
-  std::string name;
-  int min_count;
-  std::vector<LogDeleteCandidate> candidates;
-
-  LogDeletingInfo *_next{nullptr};
-  LogDeletingInfo *_prev{nullptr};
-
-  LogDeletingInfo(const char *type, int limit) : name(type), min_count(limit) {}
-  LogDeletingInfo(std::string_view type, int limit) : name(type), min_count(limit) {}
-
-  void
-  clear()
-  {
-    candidates.clear();
-  }
-};
-
-struct LogDeletingInfoDescriptor {
-  using key_type   = std::string_view;
-  using value_type = LogDeletingInfo;
-
-  static key_type
-  key_of(value_type *value)
-  {
-    return value->name;
-  }
-
-  static bool
-  equal(key_type const &lhs, key_type const &rhs)
-  {
-    return lhs == rhs;
-  }
-
-  static value_type *&
-  next_ptr(value_type *value)
-  {
-    return value->_next;
-  }
-
-  static value_type *&
-  prev_ptr(value_type *value)
-  {
-    return value->_prev;
-  }
-
-  static constexpr std::hash<std::string_view> hasher{};
-
-  static auto
-  hash_of(key_type s) -> decltype(hasher(s))
-  {
-    return hasher(s);
-  }
-};
-
-/*-------------------------------------------------------------------------
   this object keeps the state of the logging configuraion variables.  upon
   construction, the log configuration file is read and the logging
   variables are initialized.
@@ -224,6 +157,18 @@ public:
     return log_object_manager.has_api_objects();
   }
 
+  /** Register rolled logs of logname for auto-deletion when there are space
+   * constraints.
+   *
+   * @param[in] logname The name of the unrolled log to register, such as
+   * "diags.log".
+   *
+   * @param[in] rolling_min_count The minimum amount of rolled logs of logname
+   * to try to keep around. A value of 0 expresses a desire to keep all rolled
+   * files, if possible.
+   */
+  void register_rolled_log_auto_delete(std::string_view logname, int rolling_min_count);
+
 public:
   bool initialized             = false;
   bool reconfiguration_needed  = false;
@@ -254,8 +199,6 @@ public:
   bool rolling_allow_empty;
   bool auto_delete_rolled_files;
 
-  IntrusiveHashMap<LogDeletingInfoDescriptor> deleting_info;
-
   int sampling_frequency;
   int file_stat_frequency;
   int space_used_frequency;
@@ -278,6 +221,8 @@ private:
   bool m_partition_low              = false;
   bool m_log_directory_inaccessible = false;
 
+  RolledLogDeleter rolledLogDeleter;
+
   // noncopyable
   // -- member functions not allowed --
   LogConfig(const LogConfig &) = delete;
diff --git a/proxy/logging/LogObject.cc b/proxy/logging/LogObject.cc
index 040c73d..86d2a45 100644
--- a/proxy/logging/LogObject.cc
+++ b/proxy/logging/LogObject.cc
@@ -90,8 +90,8 @@ LogBufferManager::preproc_buffers(LogBufferSink *sink)
 
 LogObject::LogObject(const LogFormat *format, const char *log_dir, const char *basename, LogFileFormat file_format,
                      const char *header, Log::RollingEnabledValues rolling_enabled, int flush_threads, int rolling_interval_sec,
-                     int rolling_offset_hr, int rolling_size_mb, bool auto_created, int max_rolled, bool reopen_after_rolling,
-                     int pipe_buffer_size)
+                     int rolling_offset_hr, int rolling_size_mb, bool auto_created, int rolling_max_count, int rolling_min_count,
+                     bool reopen_after_rolling, int pipe_buffer_size)
   : m_alt_filename(nullptr),
     m_flags(0),
     m_signature(0),
@@ -100,7 +100,8 @@ LogObject::LogObject(const LogFormat *format, const char *log_dir, const char *b
     m_rolling_offset_hr(rolling_offset_hr),
     m_rolling_size_mb(rolling_size_mb),
     m_last_roll_time(0),
-    m_max_rolled(max_rolled),
+    m_max_rolled(rolling_max_count),
+    m_min_rolled(rolling_min_count),
     m_reopen_after_rolling(reopen_after_rolling),
     m_buffer_manager_idx(0),
     m_pipe_buffer_size(pipe_buffer_size)
@@ -150,6 +151,7 @@ LogObject::LogObject(LogObject &rhs)
     m_rolling_size_mb(rhs.m_rolling_size_mb),
     m_last_roll_time(rhs.m_last_roll_time),
     m_max_rolled(rhs.m_max_rolled),
+    m_min_rolled(rhs.m_min_rolled),
     m_reopen_after_rolling(rhs.m_reopen_after_rolling),
     m_buffer_manager_idx(rhs.m_buffer_manager_idx),
     m_pipe_buffer_size(rhs.m_pipe_buffer_size)
@@ -687,7 +689,7 @@ LogObject::_setup_rolling(Log::RollingEnabledValues rolling_enabled, int rolling
         m_rolling_size_mb = rolling_size_mb;
       }
     }
-
+    Log::config->register_rolled_log_auto_delete(m_basename, m_min_rolled);
     m_rolling_enabled = rolling_enabled;
   }
 }
@@ -789,9 +791,11 @@ const LogFormat *TextLogObject::textfmt = MakeTextLogFormat();
 
 TextLogObject::TextLogObject(const char *name, const char *log_dir, bool timestamps, const char *header,
                              Log::RollingEnabledValues rolling_enabled, int flush_threads, int rolling_interval_sec,
-                             int rolling_offset_hr, int rolling_size_mb, int max_rolled, bool reopen_after_rolling)
+                             int rolling_offset_hr, int rolling_size_mb, int rolling_max_count, int rolling_min_count,
+                             bool reopen_after_rolling)
   : LogObject(TextLogObject::textfmt, log_dir, name, LOG_FILE_ASCII, header, rolling_enabled, flush_threads, rolling_interval_sec,
-              rolling_offset_hr, rolling_size_mb, max_rolled, reopen_after_rolling)
+              rolling_offset_hr, rolling_size_mb, /* auto_created */ false, rolling_max_count, rolling_min_count,
+              reopen_after_rolling)
 {
   if (timestamps) {
     this->set_fmt_timestamps();
diff --git a/proxy/logging/LogObject.h b/proxy/logging/LogObject.h
index 32dafb1..87f5c80 100644
--- a/proxy/logging/LogObject.h
+++ b/proxy/logging/LogObject.h
@@ -95,8 +95,8 @@ public:
 
   LogObject(const LogFormat *format, const char *log_dir, const char *basename, LogFileFormat file_format, const char *header,
             Log::RollingEnabledValues rolling_enabled, int flush_threads, int rolling_interval_sec = 0, int rolling_offset_hr = 0,
-            int rolling_size_mb = 0, bool auto_created = false, int rolling_max_count = 0, bool reopen_after_rolling = false,
-            int pipe_buffer_size = 0);
+            int rolling_size_mb = 0, bool auto_created = false, int rolling_max_count = 0, int rolling_min_count = 0,
+            bool reopen_after_rolling = false, int pipe_buffer_size = 0);
   LogObject(LogObject &);
   ~LogObject() override;
 
@@ -280,6 +280,7 @@ private:
   int m_rolling_size_mb;       // size at which the log file rolls
   long m_last_roll_time;       // the last time this object rolled its files
   int m_max_rolled;            // maximum number of rolled logs to be kept, 0 no limit
+  int m_min_rolled;            // minimum number of rolled logs to be kept, 0 no limit
   bool m_reopen_after_rolling; // reopen log file after rolling (normally it is just renamed and closed)
 
   head_p m_log_buffer; // current work buffer
@@ -313,7 +314,8 @@ class TextLogObject : public LogObject
 public:
   inkcoreapi TextLogObject(const char *name, const char *log_dir, bool timestamps, const char *header,
                            Log::RollingEnabledValues rolling_enabled, int flush_threads, int rolling_interval_sec,
-                           int rolling_offset_hr, int rolling_size_mb, int max_rolled, bool reopen_after_rolling);
+                           int rolling_offset_hr, int rolling_size_mb, int rolling_max_count, int rolling_min_count,
+                           bool reopen_after_rolling);
 
   inkcoreapi int write(const char *format, ...) TS_PRINTFLIKE(2, 3);
   inkcoreapi int va_write(const char *format, va_list ap);
@@ -414,7 +416,7 @@ LogObject::operator==(LogObject &old)
           strcmp(m_logFile->get_name(), old.m_logFile->get_name()) == 0 && (m_filter_list == old.m_filter_list) &&
           (m_rolling_interval_sec == old.m_rolling_interval_sec && m_rolling_offset_hr == old.m_rolling_offset_hr &&
            m_rolling_size_mb == old.m_rolling_size_mb && m_reopen_after_rolling == old.m_reopen_after_rolling &&
-           m_max_rolled == old.m_max_rolled));
+           m_max_rolled == old.m_max_rolled && m_min_rolled == old.m_min_rolled));
 }
 
 inline off_t
diff --git a/proxy/logging/Makefile.am b/proxy/logging/Makefile.am
index abf8e86..58b6b55 100644
--- a/proxy/logging/Makefile.am
+++ b/proxy/logging/Makefile.am
@@ -61,19 +61,24 @@ liblogging_a_SOURCES = \
 	LogObject.h \
 	LogUtils.cc \
 	LogUtils.h \
+	RolledLogDeleter.cc \
+	RolledLogDeleter.h \
 	YamlLogConfig.cc \
 	YamlLogConfigDecoders.cc \
 	YamlLogConfig.h
 
 check_PROGRAMS = \
 	test_LogUtils \
-	test_LogUtils2
+	test_LogUtils2 \
+	test_RolledLogDeleter
 
 TESTS = \
 	test_LogUtils \
-	test_LogUtils2
+	test_LogUtils2 \
+	test_RolledLogDeleter
 
-test_LogUtils_CPPFLAGS =  $(AM_CPPFLAGS)\
+test_LogUtils_CPPFLAGS = \
+	$(AM_CPPFLAGS) \
 	-DTEST_LOG_UTILS
 
 test_LogUtils_SOURCES = \
@@ -84,7 +89,8 @@ test_LogUtils_LDADD = \
 	$(top_builddir)/src/tscpp/util/libtscpputil.la \
 	$(top_builddir)/iocore/eventsystem/libinkevent.a
 
-test_LogUtils2_CPPFLAGS =  $(AM_CPPFLAGS)\
+test_LogUtils2_CPPFLAGS = \
+	$(AM_CPPFLAGS) \
 	-DTEST_LOG_UTILS \
 	-I$(abs_top_srcdir)/tests/include
 
@@ -98,5 +104,20 @@ test_LogUtils2_LDADD = \
 	$(top_builddir)/src/tscpp/util/libtscpputil.la \
 	$(top_builddir)/iocore/eventsystem/libinkevent.a
 
+test_RolledLogDeleter_CPPFLAGS = \
+	$(AM_CPPFLAGS) \
+	-DTEST_LOG_UTILS \
+	-I$(abs_top_srcdir)/tests/include
+
+test_RolledLogDeleter_SOURCES = \
+	RolledLogDeleter.cc \
+	LogUtils.cc \
+	unit-tests/test_RolledLogDeleter.cc
+
+test_RolledLogDeleter_LDADD = \
+	$(top_builddir)/src/tscore/libtscore.la \
+	$(top_builddir)/src/tscpp/util/libtscpputil.la \
+	$(top_builddir)/iocore/eventsystem/libinkevent.a
+
 clang-tidy-local: $(liblogging_a_SOURCES) $(EXTRA_DIST)
 	$(CXX_Clang_Tidy)
diff --git a/proxy/logging/RolledLogDeleter.cc b/proxy/logging/RolledLogDeleter.cc
new file mode 100644
index 0000000..2d51738
--- /dev/null
+++ b/proxy/logging/RolledLogDeleter.cc
@@ -0,0 +1,135 @@
+/** @file
+
+  This file implements the rolled log deletion.
+
+  @section license License
+
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+
+#include <climits>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <unistd.h>
+
+#include "RolledLogDeleter.h"
+#include "LogUtils.h"
+#include "tscore/ts_file.h"
+#include "tscpp/util/TextView.h"
+
+namespace fs = ts::file;
+
+LogDeletingInfo::LogDeletingInfo(const char *_logname, int _min_count)
+  : logname(_logname),
+    /**
+     * A min_count of zero indicates a request to try to keep all rotated logs
+     * around. By setting min_count to INT_MAX in these cases, we make the rolled
+     * log deletion priority small.
+     *
+     * @note This cannot have a zero value because it is used as the denominator
+     * in a division operation when calculating the log deletion preference.
+     */
+    min_count((_min_count > 0) ? _min_count : INT_MAX)
+{
+}
+
+LogDeletingInfo::LogDeletingInfo(std::string_view _logname, int _min_count)
+  : logname(_logname),
+    /**
+     * A min_count of zero indicates a request to try to keep all rotated logs
+     * around. By setting min_count to INT_MAX in these cases, we make the rolled
+     * log deletion priority small.
+     *
+     * @note This cannot have a zero value because it is used as the denominator
+     * in a division operation when calculating the log deletion preference.
+     */
+    min_count((_min_count > 0) ? _min_count : INT_MAX)
+{
+}
+
+void
+RolledLogDeleter::register_log_type_for_deletion(std::string_view log_type, int rolling_min_count)
+{
+  auto deletingInfo     = std::make_unique<LogDeletingInfo>(log_type, rolling_min_count);
+  auto *deletingInfoPtr = deletingInfo.get();
+
+  deletingInfoList.push_back(std::move(deletingInfo));
+  deleting_info.insert(deletingInfoPtr);
+}
+
+bool
+RolledLogDeleter::consider_for_candidacy(std::string_view log_path, int64_t file_size, time_t modification_time)
+{
+  const fs::path rolled_log_file = fs::filename(log_path);
+  auto iter                      = deleting_info.find(LogUtils::get_unrolled_filename(rolled_log_file.view()));
+  if (iter == deleting_info.end()) {
+    return false;
+  }
+  auto &candidates = iter->candidates;
+  candidates.push_back(std::make_unique<LogDeleteCandidate>(log_path, file_size, modification_time));
+  ++num_candidates;
+
+  std::sort(
+    candidates.begin(), candidates.end(),
+    [](std::unique_ptr<LogDeleteCandidate> const &a, std::unique_ptr<LogDeleteCandidate> const &b) { return a->mtime > b->mtime; });
+
+  return true;
+}
+
+std::unique_ptr<LogDeleteCandidate>
+RolledLogDeleter::take_next_candidate_to_delete()
+{
+  if (!has_candidates()) {
+    return nullptr;
+  }
+  // Select the highest priority type (diags.log, traffic.out, etc.) from which
+  // to select a candidate.
+  auto target_type =
+    std::max_element(deleting_info.begin(), deleting_info.end(), [](LogDeletingInfo const &A, LogDeletingInfo const &B) {
+      return static_cast<double>(A.candidates.size()) / A.min_count < static_cast<double>(B.candidates.size()) / B.min_count;
+    });
+
+  auto &candidates = target_type->candidates;
+  if (candidates.empty()) {
+    return nullptr;
+  }
+
+  // Return the highest priority candidate among the candidates of that type.
+  auto victim = std::move(candidates.back());
+  candidates.pop_back();
+  --num_candidates;
+  return victim;
+}
+
+bool
+RolledLogDeleter::has_candidates() const
+{
+  return get_candidate_count() != 0;
+}
+
+size_t
+RolledLogDeleter::get_candidate_count() const
+{
+  return num_candidates;
+}
+
+void
+RolledLogDeleter::clear_candidates()
+{
+  deleting_info.apply([](LogDeletingInfo &info) { info.clear(); });
+  num_candidates = 0;
+}
diff --git a/proxy/logging/RolledLogDeleter.h b/proxy/logging/RolledLogDeleter.h
new file mode 100644
index 0000000..f03785f
--- /dev/null
+++ b/proxy/logging/RolledLogDeleter.h
@@ -0,0 +1,214 @@
+/** @file
+
+  This contains the rotated log deletion mechanism.
+
+  @section license License
+
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <list>
+#include <memory>
+#include <string>
+#include <string_view>
+#include <time.h>
+#include <vector>
+
+#include "tscore/IntrusiveHashMap.h"
+
+/*-------------------------------------------------------------------------
+  LogDeleteCandidate, LogDeletingInfo&Descriptor
+  -------------------------------------------------------------------------*/
+
+struct LogDeleteCandidate {
+  /** The filename for this rolled log deletion candidate.
+   *
+   * For example: /var/log/my_log.log_a_host_name.20191122.20h18m35s-20191122.20h18m51s.old
+   */
+  std::string rolled_log_path;
+  int64_t size;
+  time_t mtime;
+
+  LogDeleteCandidate(std::string_view p_name, int64_t st_size, time_t st_time)
+    : rolled_log_path(p_name), size(st_size), mtime(st_time)
+  {
+  }
+};
+
+/** Configure rolled log deletion for a set of logs.
+ *
+ * This contains the configuration and set of log deletion candidates for a set
+ * of log files associated with logname. There will be an instance of this for
+ * diags.log and its associated rolled log files, one for traffic.out, etc.
+ */
+struct LogDeletingInfo {
+  /** The unrolled log name (such as "diags.log"). */
+  const std::string logname;
+
+  /** The minimum number of rolled log files to try to keep around.
+   *
+   * @note This is guaranteed to be a positive (non-zero) value.
+   */
+  int min_count;
+
+  std::vector<std::unique_ptr<LogDeleteCandidate>> candidates;
+
+  LogDeletingInfo *_next{nullptr};
+  LogDeletingInfo *_prev{nullptr};
+
+  /**
+   * @param[in] logname The unrolled log name.
+   *
+   * @param[in] min_count The minimum number of rolled files to try to keep
+   * around when deleting rolled logs. A zero indicates a desire to keep all
+   * rolled logs around.
+   *
+   * @note The min_count is used as a part of a calculation to determine which
+   * set of deletion candidates should be used for selecting a rolled log file
+   * to delete. If space is particularly constrained, even LogDeletingInfo
+   * instances with a min_count of 0 may be selected for deletion.
+   */
+  LogDeletingInfo(const char *logname, int min_count);
+  LogDeletingInfo(std::string_view logname, int min_count);
+
+  void
+  clear()
+  {
+    candidates.clear();
+  }
+};
+
+struct LogDeletingInfoDescriptor {
+  using key_type   = std::string_view;
+  using value_type = LogDeletingInfo;
+
+  static key_type
+  key_of(value_type *value)
+  {
+    return value->logname;
+  }
+
+  static bool
+  equal(key_type const &lhs, key_type const &rhs)
+  {
+    return lhs == rhs;
+  }
+
+  static value_type *&
+  next_ptr(value_type *value)
+  {
+    return value->_next;
+  }
+
+  static value_type *&
+  prev_ptr(value_type *value)
+  {
+    return value->_prev;
+  }
+
+  static constexpr std::hash<std::string_view> hasher{};
+
+  static auto
+  hash_of(key_type s) -> decltype(hasher(s))
+  {
+    return hasher(s);
+  }
+};
+
+/**
+ * RolledLogDeleter is responsible for keeping track of rolled log candidates
+ * and presenting them for deletion in a prioritized order based on size and
+ * last modified time stamp.
+ *
+ * Terminology:
+ *
+ * log type: An unrolled log name that represents a category of rolled log
+ * files that are candidates for deletion. This may be something like
+ * diags.log, traffic.out, etc.
+ *
+ * candidate: A rolled log file which is a candidate for deletion at some
+ * point. This may be something like:
+ *   squid.log_some.hostname.com.20191125.19h00m04s-20191125.19h15m04s.old.
+ */
+class RolledLogDeleter
+{
+public:
+  /** Register a new log type for candidates for log deletion.
+   *
+   * @param[in] log_type The unrolled name for a set of rolled log files to
+   * consider for deletion. This may be something like diags.log, for example.
+   *
+   * @param[in] rolling_min_count The minimum number of rolled log files to
+   * keep around.
+   */
+  void register_log_type_for_deletion(std::string_view log_type, int rolling_min_count);
+
+  /** Evaluate a rolled log file to see whether it is a candidate for deletion.
+   *
+   * If the rolled log file is a valid candidate, it will be stored and considered
+   * for deletion upon later calls to deleteALogFile.
+   *
+   * @param[in] log_path The rolled log file path.
+   *
+   * @param[in] file_size The size of the rolled log file.
+   *
+   * @param[in] modification_time The time the rolled log file was last modified.
+   * candidate for deletion.
+   *
+   * @return True if the rolled log file is a deletion candidate, false otherwise.
+   */
+  bool consider_for_candidacy(std::string_view log_path, int64_t file_size, time_t modification_time);
+
+  /** Retrieve the next rolled log file to delete.
+   *
+   * This removes the returned rolled file from the candidates list.
+   *
+   * @return The next rolled log candidate to delete or nullptr if there is no
+   * such candidate.
+   */
+  std::unique_ptr<LogDeleteCandidate> take_next_candidate_to_delete();
+
+  /** Whether there are any candidates for possible deletion.
+   *
+   * @return True if there are candidates for deletion, false otherwise.
+   */
+  bool has_candidates() const;
+
+  /** Retrieve the number of rolled log deletion candidates.
+   *
+   * @return The number of rolled logs that are candidates for deletion.
+   */
+  size_t get_candidate_count() const;
+
+  /** Clear the internal candidates array.
+   */
+  void clear_candidates();
+
+private:
+  /** The owning references to the set of LogDeletingInfo added to the below
+   * hash map. */
+  std::list<std::unique_ptr<LogDeletingInfo>> deletingInfoList;
+
+  /** The set of candidates for deletion keyed by log_type. */
+  IntrusiveHashMap<LogDeletingInfoDescriptor> deleting_info;
+
+  /** The number of tracked candidates. */
+  size_t num_candidates = 0;
+};
diff --git a/proxy/logging/YamlLogConfig.cc b/proxy/logging/YamlLogConfig.cc
index 25cefe9..5780aa6 100644
--- a/proxy/logging/YamlLogConfig.cc
+++ b/proxy/logging/YamlLogConfig.cc
@@ -109,10 +109,19 @@ TsEnumDescriptor ROLLING_MODE_TEXT = {{{"none", 0}, {"time", 1}, {"size", 2}, {"
 TsEnumDescriptor ROLLING_MODE_LUA  = {
   {{"log.roll.none", 0}, {"log.roll.time", 1}, {"log.roll.size", 2}, {"log.roll.both", 3}, {"log.roll.any", 4}}};
 
-std::set<std::string> valid_log_object_keys = {
-  "filename",          "format",          "mode",    "header",    "rolling_enabled",   "rolling_interval_sec",
-  "rolling_offset_hr", "rolling_size_mb", "filters", "min_count", "rolling_max_count", "rolling_allow_empty",
-  "pipe_buffer_size"};
+std::set<std::string> valid_log_object_keys = {"filename",
+                                               "format",
+                                               "mode",
+                                               "header",
+                                               "rolling_enabled",
+                                               "rolling_interval_sec",
+                                               "rolling_offset_hr",
+                                               "rolling_size_mb",
+                                               "filters",
+                                               "rolling_min_count",
+                                               "rolling_max_count",
+                                               "rolling_allow_empty",
+                                               "pipe_buffer_size"};
 
 LogObject *
 YamlLogConfig::decodeLogObject(const YAML::Node &node)
@@ -158,7 +167,7 @@ YamlLogConfig::decodeLogObject(const YAML::Node &node)
   int obj_rolling_interval_sec = cfg->rolling_interval_sec;
   int obj_rolling_offset_hr    = cfg->rolling_offset_hr;
   int obj_rolling_size_mb      = cfg->rolling_size_mb;
-  int obj_min_count            = cfg->rolling_min_count;
+  int obj_rolling_min_count    = cfg->rolling_min_count;
   int obj_rolling_max_count    = cfg->rolling_max_count;
   int obj_rolling_allow_empty  = cfg->rolling_allow_empty;
 
@@ -184,8 +193,8 @@ YamlLogConfig::decodeLogObject(const YAML::Node &node)
   if (node["rolling_size_mb"]) {
     obj_rolling_size_mb = node["rolling_size_mb"].as<int>();
   }
-  if (node["min_count"]) {
-    obj_min_count = node["min_count"].as<int>();
+  if (node["rolling_min_count"]) {
+    obj_rolling_min_count = node["rolling_min_count"].as<int>();
   }
   if (node["rolling_max_count"]) {
     obj_rolling_max_count = node["rolling_max_count"].as<int>();
@@ -210,7 +219,7 @@ YamlLogConfig::decodeLogObject(const YAML::Node &node)
   auto logObject = new LogObject(fmt, Log::config->logfile_dir, filename.c_str(), file_type, header.c_str(),
                                  static_cast<Log::RollingEnabledValues>(obj_rolling_enabled), Log::config->preproc_threads,
                                  obj_rolling_interval_sec, obj_rolling_offset_hr, obj_rolling_size_mb, /* auto_created */ false,
-                                 /* rolling_max_count */ obj_rolling_max_count,
+                                 /* rolling_max_count */ obj_rolling_max_count, /* rolling_min_count */ obj_rolling_min_count,
                                  /* reopen_after_rolling */ obj_rolling_allow_empty > 0, pipe_buffer_size);
 
   // Generate LogDeletingInfo entry for later use
@@ -228,7 +237,6 @@ YamlLogConfig::decodeLogObject(const YAML::Node &node)
   default:
     break;
   }
-  cfg->deleting_info.insert(new LogDeletingInfo(filename + ext, ((obj_min_count == 0) ? INT_MAX : obj_min_count)));
 
   // filters
   auto filters = node["filters"];
diff --git a/proxy/logging/unit-tests/test_LogUtils.h b/proxy/logging/unit-tests/test_LogUtils.h
index 3c9ed22..c5b4da9 100644
--- a/proxy/logging/unit-tests/test_LogUtils.h
+++ b/proxy/logging/unit-tests/test_LogUtils.h
@@ -23,6 +23,8 @@
 
 #pragma once
 
+#include <cstring>
+
 struct MIMEField {
   const char *tag, *value;
 
diff --git a/proxy/logging/unit-tests/test_RolledLogDeleter.cc b/proxy/logging/unit-tests/test_RolledLogDeleter.cc
new file mode 100644
index 0000000..884030d
--- /dev/null
+++ b/proxy/logging/unit-tests/test_RolledLogDeleter.cc
@@ -0,0 +1,319 @@
+/** @file
+
+  Catch-based tests for RolledLogDeleter.
+
+  @section license License
+
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+
+#include <RolledLogDeleter.h>
+
+#include "tscore/ts_file.h"
+
+#define CATCH_CONFIG_MAIN
+#include "catch.hpp"
+
+namespace fs = ts::file;
+
+const fs::path log_dir("/home/y/logs/trafficserver");
+
+void
+verify_there_are_no_candidates(RolledLogDeleter &deleter)
+{
+  CHECK_FALSE(deleter.has_candidates());
+  CHECK(deleter.get_candidate_count() == 0);
+}
+
+void
+verify_rolled_log_behavior(RolledLogDeleter &deleter, fs::path rolled_log1, fs::path rolled_log2, fs::path rolled_log3)
+{
+  SECTION("Verify we can add a single rolled files")
+  {
+    constexpr int64_t file_size    = 100;
+    constexpr time_t last_modified = 30;
+
+    REQUIRE(deleter.consider_for_candidacy(rolled_log1.string(), file_size, last_modified));
+
+    CHECK(deleter.has_candidates());
+    CHECK(deleter.get_candidate_count() == 1);
+
+    const auto next_candidate = deleter.take_next_candidate_to_delete();
+    CHECK(next_candidate->rolled_log_path == rolled_log1.string());
+
+    // Everything has been taken.
+    verify_there_are_no_candidates(deleter);
+  }
+
+  SECTION("Verify we can add two rolled log files")
+  {
+    constexpr int64_t file_size         = 100;
+    constexpr time_t oldest_timestamp   = 30;
+    constexpr time_t youngest_timestamp = 60;
+
+    // Intentionally insert them out of order (that is the first one to delete
+    // is the second added).
+    REQUIRE(deleter.consider_for_candidacy(rolled_log2.string(), file_size, youngest_timestamp));
+    REQUIRE(deleter.consider_for_candidacy(rolled_log1.string(), file_size, oldest_timestamp));
+
+    CHECK(deleter.has_candidates());
+    CHECK(deleter.get_candidate_count() == 2);
+
+    // The first candidate should be the oldest modified one.
+    auto next_candidate = deleter.take_next_candidate_to_delete();
+    CHECK(next_candidate->rolled_log_path == rolled_log1.string());
+
+    CHECK(deleter.has_candidates());
+    CHECK(deleter.get_candidate_count() == 1);
+
+    // The second candidate should be the remaining one.
+    next_candidate = deleter.take_next_candidate_to_delete();
+    CHECK(next_candidate->rolled_log_path == rolled_log2.string());
+
+    // Everything has been taken.
+    verify_there_are_no_candidates(deleter);
+  }
+
+  SECTION("Verify we can add three rolled log files")
+  {
+    constexpr int64_t file_size = 100;
+
+    constexpr time_t oldest_timestamp   = 30;
+    constexpr time_t youngest_timestamp = 60;
+    constexpr time_t middle_timestamp   = 45;
+
+    // Intentionally insert them out of order.
+    REQUIRE(deleter.consider_for_candidacy(rolled_log2.string(), file_size, youngest_timestamp));
+    REQUIRE(deleter.consider_for_candidacy(rolled_log1.string(), file_size, oldest_timestamp));
+    REQUIRE(deleter.consider_for_candidacy(rolled_log3.string(), file_size, middle_timestamp));
+
+    CHECK(deleter.has_candidates());
+    CHECK(deleter.get_candidate_count() == 3);
+
+    // The first candidate should be the oldest modified one.
+    auto next_candidate = deleter.take_next_candidate_to_delete();
+    CHECK(next_candidate->rolled_log_path == rolled_log1.string());
+
+    CHECK(deleter.has_candidates());
+    CHECK(deleter.get_candidate_count() == 2);
+
+    // The second candidate should be the second oldest.
+    next_candidate = deleter.take_next_candidate_to_delete();
+    CHECK(next_candidate->rolled_log_path == rolled_log3.string());
+
+    CHECK(deleter.has_candidates());
+    CHECK(deleter.get_candidate_count() == 1);
+
+    // The third candidate should be the remaining one.
+    next_candidate = deleter.take_next_candidate_to_delete();
+    CHECK(next_candidate->rolled_log_path == rolled_log2.string());
+
+    // Everything has been taken.
+    verify_there_are_no_candidates(deleter);
+  }
+}
+
+TEST_CASE("Rotated diags logs can be added and removed", "[RolledLogDeleter]")
+{
+  RolledLogDeleter deleter;
+  constexpr auto min_count = 0;
+  deleter.register_log_type_for_deletion("diags.log", min_count);
+
+  const fs::path rolled_log1 = log_dir / "diags.log.20191117.16h43m15s-20191118.16h43m15s.old";
+  const fs::path rolled_log2 = log_dir / "diags.log.20191118.16h43m15s-20191122.04h07m09s.old";
+  const fs::path rolled_log3 = log_dir / "diags.log.20191122.04h07m09s-20191124.00h12m47s.old";
+
+  verify_there_are_no_candidates(deleter);
+  verify_rolled_log_behavior(deleter, rolled_log1, rolled_log2, rolled_log3);
+}
+
+TEST_CASE("Rotated squid logs can be added and removed", "[RolledLogDeleter]")
+{
+  RolledLogDeleter deleter;
+  constexpr auto min_count = 0;
+  deleter.register_log_type_for_deletion("squid.log", min_count);
+  const fs::path rolled_log1 = log_dir / "squid.log_some.hostname.com.20191125.19h00m04s-20191125.19h15m04s.old";
+  const fs::path rolled_log2 = log_dir / "squid.log_some.hostname.com.20191125.19h15m04s-20191125.19h30m04s.old";
+  const fs::path rolled_log3 = log_dir / "squid.log_some.hostname.com.20191125.19h30m04s-20191125.19h45m04s.old";
+
+  verify_there_are_no_candidates(deleter);
+  verify_rolled_log_behavior(deleter, rolled_log1, rolled_log2, rolled_log3);
+}
+
+TEST_CASE("clear removes all candidates", "[RolledLogDeleter]")
+{
+  RolledLogDeleter deleter;
+  constexpr auto min_count = 0;
+  deleter.register_log_type_for_deletion("squid.log", min_count);
+  deleter.register_log_type_for_deletion("diags.log", min_count);
+
+  constexpr auto size         = 10;
+  constexpr time_t time_stamp = 20;
+
+  // Add some candidates.
+  REQUIRE(deleter.consider_for_candidacy("squid.log_arbitrary-text-1", size, time_stamp));
+  REQUIRE(deleter.consider_for_candidacy("squid.log_arbitrary-text-2", size, time_stamp));
+  REQUIRE(deleter.consider_for_candidacy("squid.log_arbitrary-text-3", size, time_stamp));
+
+  REQUIRE(deleter.consider_for_candidacy("diags.log.arbitrary-text-1", size, time_stamp));
+  REQUIRE(deleter.consider_for_candidacy("diags.log.arbitrary-text-2", size, time_stamp));
+  REQUIRE(deleter.consider_for_candidacy("diags.log.arbitrary-text-3", size, time_stamp));
+
+  REQUIRE(deleter.has_candidates());
+  REQUIRE(deleter.get_candidate_count() == 6);
+
+  deleter.clear_candidates();
+  verify_there_are_no_candidates(deleter);
+}
+
+TEST_CASE("verify priority enforcement", "[RolledLogDeleter]")
+{
+  RolledLogDeleter deleter;
+
+  constexpr auto low_min_count     = 1;
+  constexpr auto medium_min_count  = 3;
+  constexpr auto highest_min_count = 0;
+
+  constexpr int64_t a_size = 10;
+  constexpr time_t a_time  = 30;
+
+  deleter.register_log_type_for_deletion("squid.log", low_min_count);
+  deleter.register_log_type_for_deletion("traffic.out", medium_min_count);
+  deleter.register_log_type_for_deletion("diags.log", highest_min_count);
+
+  /* The previous tests verify selection within a log_type which is done based
+   * upon last modified time stamp. These tests focus on selection of
+   * candidates across log types, which is based upon number of candidates and
+   * the desired min_count. */
+  SECTION("Verify selection of a candidate when there is only one.")
+  {
+    const fs::path rolled_squid = log_dir / "squid.log_some.hostname.com.20191125.19h00m04s-20191125.19h15m04s.old";
+    REQUIRE(deleter.consider_for_candidacy(rolled_squid.view(), a_size, a_time));
+    const auto next_candidate = deleter.take_next_candidate_to_delete();
+    CHECK(next_candidate->rolled_log_path == rolled_squid.string());
+    verify_there_are_no_candidates(deleter);
+  }
+
+  SECTION("Verify selection of candidates across three types.")
+  {
+    const fs::path rolled_squid   = log_dir / "squid.log_some.hostname.com.20191125.19h00m04s-20191125.19h15m04s.old";
+    const fs::path rolled_traffic = log_dir / "traffic.out.20191118.16h43m11s-20191122.01h30m30s.old";
+    const fs::path rolled_diags   = log_dir / "diags.log.20191117.16h43m15s-20191118.16h43m15s.old";
+
+    REQUIRE(deleter.consider_for_candidacy(rolled_squid.view(), a_size, a_time));
+    REQUIRE(deleter.consider_for_candidacy(rolled_traffic.view(), a_size, a_time));
+    REQUIRE(deleter.consider_for_candidacy(rolled_diags.view(), a_size, a_time));
+
+    // Since the time stamps of both are the same, selection should be made
+    // based upon min_count.
+    auto next_candidate = deleter.take_next_candidate_to_delete();
+    CHECK(next_candidate->rolled_log_path == rolled_squid.string());
+
+    next_candidate = deleter.take_next_candidate_to_delete();
+    CHECK(next_candidate->rolled_log_path == rolled_traffic.string());
+
+    next_candidate = deleter.take_next_candidate_to_delete();
+    CHECK(next_candidate->rolled_log_path == rolled_diags.string());
+
+    verify_there_are_no_candidates(deleter);
+  }
+
+  SECTION("Verify that number of candidates is taken into account.")
+  {
+    const fs::path rolled_squid    = log_dir / "squid.log_some.hostname.com.20191125.19h00m04s-20191125.19h15m04s.old";
+    const fs::path rolled_traffic1 = log_dir / "traffic.out.20191117.16h43m15s-20191118.16h43m15s.old";
+    const fs::path rolled_traffic2 = log_dir / "traffic.out.20191118.16h43m15s-20191122.04h07m09s.old";
+    const fs::path rolled_traffic3 = log_dir / "traffic.out.20191122.04h07m09s-20191124.00h12m47s.old";
+    const fs::path rolled_traffic4 = log_dir / "traffic.out.20191124.00h12m44s-20191125.00h12m44s.old";
+
+    constexpr time_t old       = 60;
+    constexpr time_t older     = 30;
+    constexpr time_t oldest    = 10;
+    constexpr time_t oldestest = 5;
+
+    REQUIRE(deleter.consider_for_candidacy(rolled_squid.view(), a_size, a_time));
+    REQUIRE(deleter.consider_for_candidacy(rolled_traffic1.view(), a_size, old));
+    REQUIRE(deleter.consider_for_candidacy(rolled_traffic2.view(), a_size, older));
+    REQUIRE(deleter.consider_for_candidacy(rolled_traffic3.view(), a_size, oldest));
+    REQUIRE(deleter.consider_for_candidacy(rolled_traffic4.view(), a_size, oldestest));
+
+    // The user has requested a higher number of traffic.out files, but since
+    // there are so many of them, the oldest of them should be selected next.
+    auto next_candidate = deleter.take_next_candidate_to_delete();
+    CHECK(next_candidate->rolled_log_path == rolled_traffic4.string());
+
+    // Next, squid.log should be chosen.
+    next_candidate = deleter.take_next_candidate_to_delete();
+    CHECK(next_candidate->rolled_log_path == rolled_squid.string());
+
+    // Now, there's only traffic.out files.
+    next_candidate = deleter.take_next_candidate_to_delete();
+    CHECK(next_candidate->rolled_log_path == rolled_traffic3.string());
+    next_candidate = deleter.take_next_candidate_to_delete();
+    CHECK(next_candidate->rolled_log_path == rolled_traffic2.string());
+    next_candidate = deleter.take_next_candidate_to_delete();
+    CHECK(next_candidate->rolled_log_path == rolled_traffic1.string());
+
+    verify_there_are_no_candidates(deleter);
+  }
+
+  SECTION("A mincount of 0 should shield from deletion as much as possible")
+  {
+    const fs::path rolled_traffic = log_dir / "traffic.out.20191117.16h43m15s-20191118.16h43m15s.old";
+    const fs::path rolled_diags1  = log_dir / "diags.log.20191117.16h43m15s-20191118.16h43m15s.old";
+    const fs::path rolled_diags2  = log_dir / "diags.log.20191118.16h43m15s-20191122.04h07m09s.old";
+    const fs::path rolled_diags3  = log_dir / "diags.log.20191122.04h07m09s-20191124.00h12m47s.old";
+    const fs::path rolled_diags4  = log_dir / "diags.log.20191124.00h12m44s-20191125.00h12m44s.old";
+
+    constexpr time_t old       = 60;
+    constexpr time_t older     = 30;
+    constexpr time_t oldest    = 10;
+    constexpr time_t oldestest = 5;
+
+    REQUIRE(deleter.consider_for_candidacy(rolled_traffic.view(), a_size, a_time));
+    REQUIRE(deleter.consider_for_candidacy(rolled_diags1.view(), a_size, old));
+    REQUIRE(deleter.consider_for_candidacy(rolled_diags2.view(), a_size, older));
+    REQUIRE(deleter.consider_for_candidacy(rolled_diags3.view(), a_size, oldest));
+    REQUIRE(deleter.consider_for_candidacy(rolled_diags4.view(), a_size, oldestest));
+
+    // Even with so many diags.log files, the traffic.out one should be
+    // selected first because the min_count of diags.log is 0.
+    auto next_candidate = deleter.take_next_candidate_to_delete();
+    CHECK(next_candidate->rolled_log_path == rolled_traffic.string());
+
+    // Now there's only diags.log files.
+    next_candidate = deleter.take_next_candidate_to_delete();
+    CHECK(next_candidate->rolled_log_path == rolled_diags4.string());
+    next_candidate = deleter.take_next_candidate_to_delete();
+    CHECK(next_candidate->rolled_log_path == rolled_diags3.string());
+    next_candidate = deleter.take_next_candidate_to_delete();
+    CHECK(next_candidate->rolled_log_path == rolled_diags2.string());
+    next_candidate = deleter.take_next_candidate_to_delete();
+    CHECK(next_candidate->rolled_log_path == rolled_diags1.string());
+
+    verify_there_are_no_candidates(deleter);
+  }
+}
+
+//
+// Stub
+//
+void
+RecSignalManager(int, const char *, unsigned long)
+{
+  ink_release_assert(false);
+}
diff --git a/proxy/shared/DiagsConfig.cc b/proxy/shared/DiagsConfig.cc
index 2246dc8..9c5c8be 100644
--- a/proxy/shared/DiagsConfig.cc
+++ b/proxy/shared/DiagsConfig.cc
@@ -245,7 +245,8 @@ DiagsConfig::config_diags_norecords()
 #endif
 }
 
-DiagsConfig::DiagsConfig(const char *prefix_string, const char *filename, const char *tags, const char *actions, bool use_records)
+DiagsConfig::DiagsConfig(std::string_view prefix_string, const char *filename, const char *tags, const char *actions,
+                         bool use_records)
   : callbacks_established(false), diags_log(nullptr), diags(nullptr)
 {
   char diags_logpath[PATH_NAME_MAX];
diff --git a/proxy/shared/DiagsConfig.h b/proxy/shared/DiagsConfig.h
index 76877c9..bccc071 100644
--- a/proxy/shared/DiagsConfig.h
+++ b/proxy/shared/DiagsConfig.h
@@ -32,7 +32,7 @@ struct DiagsConfig {
   void parse_output_string(char *s, DiagsModeOutput *o);
   void register_diags_callbacks();
 
-  DiagsConfig(const char *prefix_string, const char *filename, const char *tags, const char *actions, bool use_records = true);
+  DiagsConfig(std::string_view prefix_string, const char *filename, const char *tags, const char *actions, bool use_records = true);
   ~DiagsConfig();
 
 private:
diff --git a/src/traffic_server/InkAPI.cc b/src/traffic_server/InkAPI.cc
index b508e03..9e8691a 100644
--- a/src/traffic_server/InkAPI.cc
+++ b/src/traffic_server/InkAPI.cc
@@ -7491,7 +7491,7 @@ TSTextLogObjectCreate(const char *filename, int mode, TSTextLogObject *new_objec
   TextLogObject *tlog = new TextLogObject(
     filename, Log::config->logfile_dir, (bool)mode & TS_LOG_MODE_ADD_TIMESTAMP, nullptr, Log::config->rolling_enabled,
     Log::config->preproc_threads, Log::config->rolling_interval_sec, Log::config->rolling_offset_hr, Log::config->rolling_size_mb,
-    Log::config->rolling_max_count, Log::config->rolling_allow_empty);
+    Log::config->rolling_max_count, Log::config->rolling_min_count, Log::config->rolling_allow_empty);
   if (tlog == nullptr) {
     *new_object = nullptr;
     return TS_ERROR;
diff --git a/src/tscore/Diags.cc b/src/tscore/Diags.cc
index 4463d17..5f42bc9 100644
--- a/src/tscore/Diags.cc
+++ b/src/tscore/Diags.cc
@@ -88,15 +88,17 @@ location(const SourceLocation *loc, DiagsShowLocation show, DiagsLevel level)
 //
 //////////////////////////////////////////////////////////////////////////////
 
-Diags::Diags(const char *prefix_string, const char *bdt, const char *bat, BaseLogFile *_diags_log, int dl_perm, int ol_perm)
+Diags::Diags(std::string_view prefix_string, const char *bdt, const char *bat, BaseLogFile *_diags_log, int dl_perm, int ol_perm)
   : diags_log(nullptr),
     stdout_log(nullptr),
     stderr_log(nullptr),
     magic(DIAGS_MAGIC),
     show_location(SHOW_LOCATION_NONE),
     base_debug_tags(nullptr),
-    base_action_tags(nullptr)
+    base_action_tags(nullptr),
+    prefix_str(prefix_string)
 {
+  ink_release_assert(!prefix_str.empty());
   int i;
 
   cleanup_func = nullptr;
@@ -116,11 +118,8 @@ Diags::Diags(const char *prefix_string, const char *bdt, const char *bat, BaseLo
   config.enabled[DiagsTagType_Debug]  = (base_debug_tags != nullptr);
   config.enabled[DiagsTagType_Action] = (base_action_tags != nullptr);
   diags_on_for_plugins                = config.enabled[DiagsTagType_Debug];
-  prefix_str                          = prefix_string;
 
   // The caller must always provide a non-empty prefix.
-  ink_release_assert(prefix_str);
-  ink_release_assert(*prefix_str);
 
   for (i = 0; i < DiagsLevel_Count; i++) {
     config.outputs[i].to_stdout   = false;
diff --git a/tests/gold_tests/logging/log_retention.test.py b/tests/gold_tests/logging/log_retention.test.py
index 7ed68c2..5a3ebde 100644
--- a/tests/gold_tests/logging/log_retention.test.py
+++ b/tests/gold_tests/logging/log_retention.test.py
@@ -17,89 +17,452 @@ Verify correct log retention behavior.
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
 
+import os
+
 Test.Summary = '''
 Test the enforcment of proxy.config.log.max_space_mb_for_logs.
 '''
 
-# Create and configure the ATS process.
-ts = Test.MakeATSProcess("ts")
 
-ts.Disk.records_config.update({
-    # Do not accept connections from clients until cache subsystem is operational.
-    'proxy.config.http.wait_for_cache': 1,
+class TestLogRetention:
+    __base_records_config = {
+        # Do not accept connections from clients until cache subsystem is operational.
+        'proxy.config.http.wait_for_cache': 1,
+        'proxy.config.diags.debug.enabled': 1,
+        'proxy.config.diags.debug.tags': 'logspace',
+
+        # Enable log rotation and auto-deletion, the subjects of this test.
+        'proxy.config.log.rolling_enabled': 3,
+        'proxy.config.log.auto_delete_rolled_files': 1,
+
+        # 10 MB is the minimum rolling size.
+        'proxy.config.log.rolling_size_mb': 10,
+        'proxy.config.log.periodic_tasks_interval': 1,
+    }
+
+    __server = None
+    __ts_counter = 1
+    __server_is_started = False
+
+    def __init__(self, records_config, run_description, command="traffic_manager"):
+        """
+        Create a TestLogRetention instance.
+        """
+        self.server = TestLogRetention.__create_server()
+        self.ts = self.__create_ts(records_config, command)
+        self.__initialize_processes()
+        self.tr = Test.AddTestRun(run_description)
+
+    def __initialize_processes(self):
+        """
+        Create a run to initialize the server and traffic_server processes so
+        the caller doesn't have to.
+        """
+        tr = Test.AddTestRun("Initialize processes for ts{}".format(TestLogRetention.__ts_counter - 1))
+        tr.Processes.Default.Command = self.get_curl_command()
+        tr.Processes.Default.ReturnCode = 0
+        if not TestLogRetention.__server_is_started:
+            self.server.StartBefore(self.ts)
+            tr.Processes.Default.StartBefore(self.server)
+            TestLogRetention.__server_is_started = True
+        else:
+            tr.Processes.Default.StartBefore(self.ts)
+
+        tr.StillRunningAfter = self.ts
+        tr.StillRunningAfter = self.server
+
+    @classmethod
+    def __create_server(cls):
+        """
+        Create and return a server process.
+
+        There is only one server process for all the tests. This function is
+        re-entrant, but subsequent calls to it will return the cached version
+        of the single server.
+        """
+        if cls.__server:
+            return cls.__server
+
+        server = Test.MakeOriginServer("server")
+        request_header = {"headers": "GET / HTTP/1.1\r\n"
+                          "Host: does.not.matter\r\n\r\n",
+                          "timestamp": "1469733493.993", "body": ""}
+        response_header = {"headers": "HTTP/1.1 200 OK\r\n"
+                           "Connection: close\r\n"
+                           "Cache-control: max-age=85000\r\n\r\n",
+                           "timestamp": "1469733493.993", "body": "xxx"}
+        server.addResponse("sessionlog.json", request_header, response_header)
+        cls.__server = server
+        return cls.__server
+
+    def __create_ts(self, records_config, command="traffic_manager"):
+        """
+        Create an ATS process.
+
+        records_config: records_config values for this test.
+        command: The ATS process to run for the test.
+        """
+        ts_name = "ts{counter}".format(counter=TestLogRetention.__ts_counter)
+        TestLogRetention.__ts_counter += 1
+        self.ts = Test.MakeATSProcess(ts_name, command=command)
+
+        combined_records_config = TestLogRetention.__base_records_config.copy()
+        combined_records_config.update(records_config)
+        self.ts.Disk.records_config.update(combined_records_config)
 
-    'proxy.config.diags.debug.enabled': 1,
-    'proxy.config.diags.debug.tags': 'logspace',
+        self.ts.Disk.remap_config.AddLine(
+            'map http://127.0.0.1:{0} http://127.0.0.1:{1}'.format(
+                self.ts.Variables.port, self.server.Variables.Port)
+        )
+        return self.ts
 
-    'proxy.config.log.rolling_enabled': 3,
-    'proxy.config.log.auto_delete_rolled_files': 1,
+    def get_curl_command(self):
+        """
+        Generate the appropriate single curl command.
+        """
+        return 'curl "http://127.0.0.1:{0}" --verbose'.format(
+                self.ts.Variables.port)
 
-    # 10 MB is the minimum rolling size.
-    'proxy.config.log.rolling_size_mb': 10,
-    'proxy.config.log.periodic_tasks_interval': 1,
+    def get_command_to_rotate_once(self):
+        """
+        Generate the set of curl commands to trigger a log rotate.
+        """
+        return 'for i in {{1..2500}}; do curl "http://127.0.0.1:{0}" --verbose; done'.format(
+                self.ts.Variables.port)
+
+    def get_command_to_rotate_thrice(self):
+        """
+        Generate the set of curl commands to trigger a log rotate.
+        """
+        return 'for i in {{1..7500}}; do curl "http://127.0.0.1:{0}" --verbose; done'.format(
+                self.ts.Variables.port)
+
+
+#
+# Run 1: Verify that log deletion happens when no min_count is specified.
+#
+twelve_meg_log_space = {
     # The following configures a 12 MB log cap with a required 2 MB head room.
     # Thus the rotated log of just over 10 MB should be deleted because it
     # will not leave enough head room.
     'proxy.config.log.max_space_mb_headroom': 2,
     'proxy.config.log.max_space_mb_for_logs': 12,
-})
+}
+test = TestLogRetention(twelve_meg_log_space,
+                        "Verify log rotation and deletion of the configured log file with no min_count.")
+
+# Configure approximately 5 KB entries for a log with no specified min_count.
+test.ts.Disk.logging_yaml.AddLines(
+    '''
+logging:
+  formats:
+    - name: long
+      format: "{prefix}: %<sssc>"
+  logs:
+    - filename: test_deletion
+      format: long
+'''.format(prefix="0123456789"*500).split("\n")
+)
+
+# Verify that each log type was registered for auto-deletion.
+test.ts.Streams.stderr = Testers.ContainsExpression(
+        "Registering rotated log deletion for test_deletion.log with min roll count 0",
+        "Verify test_deletion.log auto-delete configuration")
+test.ts.Streams.stderr += Testers.ContainsExpression(
+        "Registering rotated log deletion for error.log with min roll count 0",
+        "Verify error.log auto-delete configuration")
+test.ts.Streams.stderr += Testers.ContainsExpression(
+        "Registering rotated log deletion for traffic.out with min roll count 0",
+        "Verify traffic.out auto-delete configuration")
+test.ts.Streams.stderr += Testers.ContainsExpression(
+        "Registering rotated log deletion for diags.log with min roll count 0",
+        "Verify diags.log auto-delete configuration")
+test.ts.Streams.stderr += Testers.ContainsExpression(
+        "Registering rotated log deletion for manager.log with min roll count 0",
+        "Verify manager.log auto-delete configuration")
+# Verify test_deletion was rotated and deleted.
+test.ts.Streams.stderr += Testers.ContainsExpression(
+        "The rolled logfile.*test_deletion.log_.*was auto-deleted.*bytes were reclaimed",
+        "Verify that space was reclaimed")
+
+test.tr.Processes.Default.Command = test.get_command_to_rotate_once()
+test.tr.Processes.Default.ReturnCode = 0
+
+test.tr.StillRunningAfter = test.ts
+test.tr.StillRunningAfter = test.server
 
 
-# Configure approximately 5 KB entries.
-ts.Disk.logging_yaml.AddLines(
+#
+# Test 2: Verify log deletion happens with a min_count of 1.
+#
+test = TestLogRetention(twelve_meg_log_space,
+                        "Verify log rotation and deletion of the configured log file with a min_count of 1.")
+
+# Configure approximately 5 KB entries for a log with no specified min_count.
+test.ts.Disk.logging_yaml.AddLines(
     '''
 logging:
   formats:
     - name: long
       format: "{prefix}: %<sssc>"
   logs:
-    - filename: test_rotation
+    - filename: test_deletion
+      rolling_min_count: 1
       format: long
 '''.format(prefix="0123456789"*500).split("\n")
 )
 
-# Verify from traffic.out that the rotated log file was auto-deleted.
-ts.Streams.stderr = Testers.ContainsExpression(
-        "logical space used.*space is not available",
-        "It was detected that space was not available")
-ts.Streams.stderr += Testers.ContainsExpression(
-        "auto-deleting.*test_rotation.log",
-        "Verify the test log file got deleted")
-ts.Streams.stderr += Testers.ContainsExpression(
-        "The rolled logfile.*was auto-deleted.*bytes were reclaimed",
+# Verify that each log type was registered for auto-deletion.
+test.ts.Streams.stderr = Testers.ContainsExpression(
+        "Registering rotated log deletion for test_deletion.log with min roll count 1",
+        "Verify test_deletion.log auto-delete configuration")
+# Only the test_deletion should have its min_count overridden.
+test.ts.Streams.stderr += Testers.ContainsExpression(
+        "Registering rotated log deletion for error.log with min roll count 0",
+        "Verify error.log auto-delete configuration")
+test.ts.Streams.stderr += Testers.ContainsExpression(
+        "Registering rotated log deletion for traffic.out with min roll count 0",
+        "Verify traffic.out auto-delete configuration")
+test.ts.Streams.stderr += Testers.ContainsExpression(
+        "Registering rotated log deletion for diags.log with min roll count 0",
+        "Verify diags.log auto-delete configuration")
+test.ts.Streams.stderr += Testers.ContainsExpression(
+        "Registering rotated log deletion for manager.log with min roll count 0",
+        "Verify manager.log auto-delete configuration")
+# Verify test_deletion was rotated and deleted.
+test.ts.Streams.stderr += Testers.ContainsExpression(
+        "The rolled logfile.*test_deletion.log_.*was auto-deleted.*bytes were reclaimed",
         "Verify that space was reclaimed")
 
-# Create and configure microserver.
-server = Test.MakeOriginServer("server")
-request_header = {"headers": "GET / HTTP/1.1\r\nHost: does.not.matter\r\n\r\n",
-                  "timestamp": "1469733493.993", "body": ""}
-response_header = {"headers": "HTTP/1.1 200 OK\r\nConnection: close\r\nCache-control: max-age=85000\r\n\r\n",
-                   "timestamp": "1469733493.993", "body": "xxx"}
-server.addResponse("sessionlog.json", request_header, response_header)
-ts.Disk.remap_config.AddLine(
-    'map http://127.0.0.1:{0} http://127.0.0.1:{1}'.format(ts.Variables.port, server.Variables.Port)
+test.tr.Processes.Default.Command = test.get_command_to_rotate_once()
+test.tr.Processes.Default.ReturnCode = 0
+test.tr.StillRunningAfter = test.ts
+test.tr.StillRunningAfter = test.server
+
+
+#
+# Test 3: Verify log deletion happens for a plugin's logs.
+#
+test = TestLogRetention(twelve_meg_log_space,
+                        "Verify log rotation and deletion of plugin logs.")
+Test.PreparePlugin(os.path.join(Test.Variables.AtsTestToolsDir, 'plugins', 'test_log_interface.cc'), test.ts)
+
+# Verify that the plugin's logs and other core logs were registered for deletion.
+test.ts.Streams.stderr = Testers.ContainsExpression(
+        "Registering rotated log deletion for test_log_interface.log with min roll count 0",
+        "Verify test_log_interface.log auto-delete configuration")
+test.ts.Streams.stderr += Testers.ContainsExpression(
+        "Registering rotated log deletion for error.log with min roll count 0",
+        "Verify error.log auto-delete configuration")
+test.ts.Streams.stderr += Testers.ContainsExpression(
+        "Registering rotated log deletion for traffic.out with min roll count 0",
+        "Verify traffic.out auto-delete configuration")
+test.ts.Streams.stderr += Testers.ContainsExpression(
+        "Registering rotated log deletion for diags.log with min roll count 0",
+        "Verify diags.log auto-delete configuration")
+test.ts.Streams.stderr += Testers.ContainsExpression(
+        "Registering rotated log deletion for manager.log with min roll count 0",
+        "Verify manager.log auto-delete configuration")
+# Verify test_deletion was rotated and deleted.
+test.ts.Streams.stderr += Testers.ContainsExpression(
+        "The rolled logfile.*test_log_interface.log_.*was auto-deleted.*bytes were reclaimed",
+        "Verify that space was reclaimed")
+
+test.tr.Processes.Default.Command = test.get_command_to_rotate_once()
+test.tr.Processes.Default.ReturnCode = 0
+test.tr.StillRunningAfter = test.ts
+test.tr.StillRunningAfter = test.server
+
+#
+# Test 4: Verify log deletion priority behavior.
+#
+twenty_two_meg_log_space = {
+    # The following configures a 22 MB log cap with a required 2 MB head room.
+    # This should allow enough room for two logs being rotated.
+    'proxy.config.log.max_space_mb_headroom': 2,
+    'proxy.config.log.max_space_mb_for_logs': 22,
+}
+test = TestLogRetention(twenty_two_meg_log_space,
+                        "Verify log deletion priority behavior.")
+
+# Configure approximately 5 KB entries for a log with no specified min_count.
+test.ts.Disk.logging_yaml.AddLines(
+    '''
+logging:
+  formats:
+    - name: long
+      format: "{prefix}: %<sssc>"
+  logs:
+    - filename: test_low_priority_deletion
+      rolling_min_count: 5
+      format: long
+
+    - filename: test_high_priority_deletion
+      rolling_min_count: 1
+      format: long
+'''.format(prefix="0123456789"*500).split("\n")
+)
+
+# Verify that each log type was registered for auto-deletion.
+test.ts.Streams.stderr = Testers.ContainsExpression(
+        "Registering rotated log deletion for test_low_priority_deletion.log with min roll count 5",
+        "Verify test_low_priority_deletion.log auto-delete configuration")
+test.ts.Streams.stderr += Testers.ContainsExpression(
+        "Registering rotated log deletion for test_high_priority_deletion.log with min roll count 1",
+        "Verify test_high_priority_deletion.log auto-delete configuration")
+# Only the test_deletion should have its min_count overridden.
+test.ts.Streams.stderr += Testers.ContainsExpression(
+        "Registering rotated log deletion for error.log with min roll count 0",
+        "Verify error.log auto-delete configuration")
+test.ts.Streams.stderr += Testers.ContainsExpression(
+        "Registering rotated log deletion for traffic.out with min roll count 0",
+        "Verify traffic.out auto-delete configuration")
+test.ts.Streams.stderr += Testers.ContainsExpression(
+        "Registering rotated log deletion for diags.log with min roll count 0",
+        "Verify diags.log auto-delete configuration")
+test.ts.Streams.stderr += Testers.ContainsExpression(
+        "Registering rotated log deletion for manager.log with min roll count 0",
+        "Verify manager.log auto-delete configuration")
+# Verify test_deletion was rotated and deleted.
+test.ts.Streams.stderr += Testers.ExcludesExpression(
+        "The rolled logfile.*test_low_priority_deletion.log_.*was auto-deleted.*bytes were reclaimed",
+        "Verify that space was reclaimed from test_high_priority_deletion")
+test.ts.Streams.stderr += Testers.ContainsExpression(
+        "The rolled logfile.*test_high_priority_deletion.log_.*was auto-deleted.*bytes were reclaimed",
+        "Verify that space was reclaimed from test_high_priority_deletion")
+
+test.tr.Processes.Default.Command = test.get_command_to_rotate_once()
+test.tr.Processes.Default.ReturnCode = 0
+test.tr.StillRunningAfter = test.ts
+test.tr.StillRunningAfter = test.server
+
+#
+# Test 5: Verify min_count configuration overrides.
+#
+various_min_count_overrides = {
+    'proxy.config.log.max_space_mb_for_logs': 22,
+    'proxy.config.log.rolling_min_count': 3,
+    'proxy.config.output.logfile.rolling_min_count': 4,
+    'proxy.config.diags.logfile.rolling_min_count': 5,
+}
+test = TestLogRetention(various_min_count_overrides,
+                        "Verify that the various min_count configurations behave as expected")
+
+# Only the test_deletion should have its min_count overridden.
+test.ts.Streams.stderr = Testers.ContainsExpression(
+        "Registering rotated log deletion for error.log with min roll count 3",
+        "Verify error.log auto-delete configuration")
+test.ts.Streams.stderr += Testers.ContainsExpression(
+        "Registering rotated log deletion for traffic.out with min roll count 4",
+        "Verify traffic.out auto-delete configuration")
+test.ts.Streams.stderr += Testers.ContainsExpression(
+        "Registering rotated log deletion for diags.log with min roll count 5",
+        "Verify diags.log auto-delete configuration")
+test.ts.Streams.stderr += Testers.ContainsExpression(
+        "Registering rotated log deletion for manager.log with min roll count 5",
+        "Verify manager.log auto-delete configuration")
+# In case a future log is added, make sure the developer doesn't forget to
+# set the min count per configuration.
+test.ts.Streams.stderr += Testers.ExcludesExpression(
+        "Registering .* with min roll count 0",
+        "Verify nothing has a default min roll count of 0 per configuration")
+
+# This test doesn't require a log rotation. We just verify that the logs communicate
+# the appropriate min_count values above.
+test.tr.Processes.Default.Command = test.get_curl_command()
+test.tr.Processes.Default.ReturnCode = 0
+test.tr.StillRunningAfter = test.ts
+test.tr.StillRunningAfter = test.server
+
+
+#
+# Test 6: Verify log deletion does not happen when it is disabled.
+#
+auto_delete_disabled = twelve_meg_log_space.copy()
+auto_delete_disabled.update({
+    'proxy.config.log.auto_delete_rolled_files': 0,
+})
+test = TestLogRetention(auto_delete_disabled,
+                        "Verify log deletion does not happen when auto-delet is disabled.")
+
+# Configure approximately 5 KB entries for a log with no specified min_count.
+test.ts.Disk.logging_yaml.AddLines(
+    '''
+logging:
+  formats:
+    - name: long
+      format: "{prefix}: %<sssc>"
+  logs:
+    - filename: test_deletion
+      rolling_min_count: 1
+      format: long
+'''.format(prefix="0123456789"*500).split("\n")
 )
 
-# The first test run starts the required processes.
-tr = Test.AddTestRun()
-tr.Processes.Default.Command = 'curl "http://127.1.1.1:{0}" --verbose ; '.format(
-        ts.Variables.port)
-tr.Processes.Default.ReturnCode = 0
-server.StartBefore(Test.Processes.ts)
-tr.Processes.Default.StartBefore(Test.Processes.server)
-
-tr.StillRunningAfter = ts
-tr.StillRunningAfter = server
-
-# With the following test run, we instigate a log rotation via entries from a
-# few thousand curl requests.
-tr = Test.AddTestRun()
-# At 5K a log entry, we need a lot of curl'd requests to get to the 10 MB roll
-# minimum.
-curl_commands = 'for i in {{1..2500}}; do curl "http://127.1.1.1:{0}" --verbose; done'.format(
-    ts.Variables.port)
-tr.Processes.Default.Command = curl_commands
-tr.Processes.Default.ReturnCode = 0
-
-tr.StillRunningAfter = ts
-tr.StillRunningAfter = server
+# Verify that each log type was registered for auto-deletion.
+test.ts.Streams.stderr = Testers.ExcludesExpression(
+        "Registering rotated log deletion for test_deletion.log with min roll count 1",
+        "Verify test_deletion.log auto-delete configuration")
+# Only the test_deletion should have its min_count overridden.
+test.ts.Streams.stderr += Testers.ExcludesExpression(
+        "Registering rotated log deletion for error.log with min roll count 0",
+        "Verify error.log auto-delete configuration")
+test.ts.Streams.stderr += Testers.ExcludesExpression(
+        "Registering rotated log deletion for traffic.out with min roll count 0",
+        "Verify traffic.out auto-delete configuration")
+test.ts.Streams.stderr += Testers.ExcludesExpression(
+        "Registering rotated log deletion for diags.log with min roll count 0",
+        "Verify diags.log auto-delete configuration")
+test.ts.Streams.stderr += Testers.ExcludesExpression(
+        "Registering rotated log deletion for manager.log with min roll count 0",
+        "Verify manager.log auto-delete configuration")
+# Verify test_deletion was not deleted.
+test.ts.Streams.stderr += Testers.ExcludesExpression(
+        "The rolled logfile.*test_deletion.log_.*was auto-deleted.*bytes were reclaimed",
+        "Verify that space was reclaimed")
+
+test.tr.Processes.Default.Command = test.get_command_to_rotate_once()
+test.tr.Processes.Default.ReturnCode = 0
+test.tr.StillRunningAfter = test.ts
+test.tr.StillRunningAfter = test.server
+
+#
+# Test 7: Verify that max_roll_count is respected.
+#
+max_roll_count_of_2 = {
+    'proxy.config.diags.debug.tags': 'log-file',
+
+    # Provide plenty of max_space: we want auto-deletion to happen because of
+    # rolling_max_count, not max_space_mb_for_logs.
+    'proxy.config.log.max_space_mb_headroom': 2,
+    'proxy.config.log.max_space_mb_for_logs': 100,
+
+    # This is the configuration under test.
+    'proxy.config.log.rolling_max_count': 2,
+}
+test = TestLogRetention(max_roll_count_of_2,
+                        "Verify max_roll_count is respected.")
+
+# Configure approximately 5 KB entries for a log with no specified min_count.
+test.ts.Disk.logging_yaml.AddLines(
+    '''
+logging:
+  formats:
+    - name: long
+      format: "{prefix}: %<sssc>"
+  logs:
+    - filename: test_deletion
+      format: long
+'''.format(prefix="0123456789"*500).split("\n")
+)
+
+# Verify that trim happened for the rolled file.
+test.ts.Streams.stderr = Testers.ContainsExpression(
+        "rolled logfile.*test_deletion.log.*old.* was auto-deleted",
+        "Verify test_deletion.log was trimmed")
+
+test.tr.Processes.Default.Command = test.get_command_to_rotate_thrice()
+test.tr.Processes.Default.ReturnCode = 0
+test.tr.StillRunningAfter = test.ts
+test.tr.StillRunningAfter = test.server
+
diff --git a/tests/tools/plugins/test_log_interface.cc b/tests/tools/plugins/test_log_interface.cc
new file mode 100644
index 0000000..b0f44f7
--- /dev/null
+++ b/tests/tools/plugins/test_log_interface.cc
@@ -0,0 +1,97 @@
+/** @file
+
+  Test a plugin's interaction with the logging interface.
+
+  @section license License
+
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+#include <ts/ts.h>
+#include <string>
+#include <string.h>
+
+constexpr auto plugin_name = "test_log_interface";
+
+static TSTextLogObject pluginlog;
+
+bool
+is_get_request(TSHttpTxn transaction)
+{
+  TSMLoc req_loc;
+  TSMBuffer req_bufp;
+  if (TSHttpTxnClientReqGet(transaction, &req_bufp, &req_loc) == TS_ERROR) {
+    TSError("Error while retrieving client request header\n");
+    return false;
+  }
+  int method_len     = 0;
+  const char *method = TSHttpHdrMethodGet(req_bufp, req_loc, &method_len);
+  if (method_len != (int)strlen(TS_HTTP_METHOD_GET) || strncasecmp(method, TS_HTTP_METHOD_GET, method_len) != 0) {
+    TSHandleMLocRelease(req_bufp, TS_NULL_MLOC, req_loc);
+    return false;
+  }
+  TSHandleMLocRelease(req_bufp, TS_NULL_MLOC, req_loc);
+  return true;
+}
+
+int
+global_handler(TSCont continuation, TSEvent event, void *data)
+{
+  TSHttpSsn session     = static_cast<TSHttpSsn>(data);
+  TSHttpTxn transaction = (TSHttpTxn)data;
+
+  switch (event) {
+  case TS_EVENT_HTTP_READ_REQUEST_HDR:
+    if (is_get_request(transaction)) {
+      const std::string long_line(5000, 's');
+      TSTextLogObjectWrite(pluginlog, "Got a GET request. Writing a long line: %s", long_line.c_str());
+    }
+    TSHttpTxnReenable(transaction, TS_EVENT_HTTP_CONTINUE);
+    return 0;
+
+  default:
+    return 0;
+  }
+
+  TSHttpSsnReenable(session, TS_EVENT_HTTP_CONTINUE);
+
+  return 0;
+}
+
+void
+TSPluginInit(int argc, const char **argv)
+{
+  TSPluginRegistrationInfo info;
+
+  info.plugin_name   = const_cast<char *>(plugin_name);
+  info.support_email = const_cast<char *>("dev@trafficserver.apache.org");
+  info.vendor_name   = const_cast<char *>("Verizon Media");
+
+  TSReturnCode ret;
+#if (TS_VERSION_MAJOR >= 7)
+  ret = TSPluginRegister(&info);
+#else
+  ret = TSPluginRegister(TS_SDK_VERSION_3_0, &info);
+#endif
+
+  if (TS_ERROR == ret) {
+    TSError("[%s] plugin registration failed\n", plugin_name);
+    return;
+  }
+
+  TSAssert(TS_SUCCESS == TSTextLogObjectCreate(plugin_name, TS_LOG_MODE_ADD_TIMESTAMP, &pluginlog));
+  TSHttpHookAdd(TS_HTTP_READ_REQUEST_HDR_HOOK, TSContCreate(global_handler, nullptr));
+}