You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by jr...@apache.org on 2021/04/28 14:25:58 UTC

[trafficserver] branch master updated: Updates to Nexthop strategies to limit the number of simultaneous (#7744)

This is an automated email from the ASF dual-hosted git repository.

jrushford pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git


The following commit(s) were added to refs/heads/master by this push:
     new f1ea499  Updates to Nexthop strategies to limit the number of simultaneous (#7744)
f1ea499 is described below

commit f1ea4993c178117199f4f53b263736d277ac4a22
Author: John J. Rushford <jr...@apache.org>
AuthorDate: Wed Apr 28 08:25:44 2021 -0600

    Updates to Nexthop strategies to limit the number of simultaneous (#7744)
    
    transactions that will retry a parent once it's retry_time window
    has elapsed.  The limiting prevents thundering retries on a parent
    that is still failed and would cause many transactions to hang or
    take much longer than necessary to use a good parent without the
    limiting.  This PR mirrors the limiting added to Parent selection.
    
    The maximum number of retriers is configured  with
    'proxy.config.http.parent_proxy.max_trans_retries' which defaults to 2.
---
 proxy/http/remap/NextHopConsistentHash.cc    | 38 ++++++++++++++++------------
 proxy/http/remap/NextHopHealthStatus.cc      | 12 ++++-----
 proxy/http/remap/NextHopRoundRobin.cc        | 23 ++++++++++-------
 proxy/http/remap/NextHopSelectionStrategy.cc | 16 +++++++++---
 proxy/http/remap/NextHopSelectionStrategy.h  | 26 +++++++++++++------
 5 files changed, 72 insertions(+), 43 deletions(-)

diff --git a/proxy/http/remap/NextHopConsistentHash.cc b/proxy/http/remap/NextHopConsistentHash.cc
index c079a8d..aa90ed1 100644
--- a/proxy/http/remap/NextHopConsistentHash.cc
+++ b/proxy/http/remap/NextHopConsistentHash.cc
@@ -302,20 +302,26 @@ NextHopConsistentHash::findNextHop(TSHttpTxn txnp, void *ih, time_t now)
       host_stat = TS_HOST_STATUS_UP;
     }
   }
-  if (!pRec || (pRec && !pRec->available) || host_stat == TS_HOST_STATUS_DOWN) {
+  if (!pRec || (pRec && !pRec->available.load()) || host_stat == TS_HOST_STATUS_DOWN) {
     do {
-      // check if an unavailable server is now retryable, use it if it is.
-      if (pRec && !pRec->available && host_stat == TS_HOST_STATUS_UP) {
-        _now == 0 ? _now = time(nullptr) : _now = now;
+      // check if an unavailable server is now retryable, use it.
+      if (pRec && !pRec->available.load() && host_stat == TS_HOST_STATUS_UP) {
         // check if the host is retryable.  It's retryable if the retry window has elapsed
-        if ((pRec->failedAt + retry_time) < static_cast<unsigned>(_now)) {
-          nextHopRetry        = true;
-          result->last_parent = pRec->host_index;
-          result->last_lookup = pRec->group_index;
-          result->retry       = nextHopRetry;
-          result->result      = PARENT_SPECIFIED;
-          NH_Debug(NH_DEBUG_TAG, "[%" PRIu64 "] next hop %s is now retryable, marked it available.", sm_id, pRec->hostname.c_str());
-          break;
+        _now == 0 ? _now = time(nullptr) : _now = now;
+        if ((pRec->failedAt.load() + retry_time) < static_cast<unsigned>(_now)) {
+          if (pRec->retriers.fetch_add(1, std::memory_order_relaxed) < max_retriers) {
+            nextHopRetry        = true;
+            result->last_parent = pRec->host_index;
+            result->last_lookup = pRec->group_index;
+            result->retry       = nextHopRetry;
+            result->result      = PARENT_SPECIFIED;
+            NH_Debug(NH_DEBUG_TAG,
+                     "[%" PRIu64 "] next hop %s is now retryable, marked it available, retriers: %d, max_retriers: %d.", sm_id,
+                     pRec->hostname.c_str(), pRec->retriers.load(), max_retriers);
+            break;
+          } else {
+            pRec->retriers--;
+          }
         }
       }
       switch (ring_mode) {
@@ -350,9 +356,9 @@ NextHopConsistentHash::findNextHop(TSHttpTxn txnp, void *ih, time_t now)
           }
         }
         NH_Debug(NH_DEBUG_TAG, "[%" PRIu64 "] Selected a new parent: %s, available: %s, wrapped: %s, lookups: %d.", sm_id,
-                 pRec->hostname.c_str(), (pRec->available) ? "true" : "false", (wrapped) ? "true" : "false", lookups);
+                 pRec->hostname.c_str(), (pRec->available.load()) ? "true" : "false", (wrapped) ? "true" : "false", lookups);
         // use available host.
-        if (pRec->available && host_stat == TS_HOST_STATUS_UP) {
+        if (pRec->available.load() && host_stat == TS_HOST_STATUS_UP) {
           break;
         }
       } else {
@@ -371,14 +377,14 @@ NextHopConsistentHash::findNextHop(TSHttpTxn txnp, void *ih, time_t now)
         }
         break;
       }
-    } while (!pRec || (pRec && !pRec->available) || host_stat == TS_HOST_STATUS_DOWN);
+    } while (!pRec || (pRec && !pRec->available.load()) || host_stat == TS_HOST_STATUS_DOWN);
   }
 
   // ----------------------------------------------------------------------------------------------------
   // Validate and return the final result.
   // ----------------------------------------------------------------------------------------------------
 
-  if (pRec && host_stat == TS_HOST_STATUS_UP && (pRec->available || result->retry)) {
+  if (pRec && host_stat == TS_HOST_STATUS_UP && (pRec->available.load() || result->retry)) {
     result->result      = PARENT_SPECIFIED;
     result->hostname    = pRec->hostname.c_str();
     result->last_parent = pRec->host_index;
diff --git a/proxy/http/remap/NextHopHealthStatus.cc b/proxy/http/remap/NextHopHealthStatus.cc
index afb43c7..9dad0cb 100644
--- a/proxy/http/remap/NextHopHealthStatus.cc
+++ b/proxy/http/remap/NextHopHealthStatus.cc
@@ -57,7 +57,7 @@ NextHopHealthStatus::isNextHopAvailable(TSHttpTxn txn, const char *hostname, con
   }
 
   std::shared_ptr p = iter->second;
-  return p->available;
+  return p->available.load();
 }
 
 /**
@@ -103,17 +103,17 @@ NextHopHealthStatus::markNextHop(TSHttpTxn txn, const char *hostname, const int
   switch (status) {
   // Mark the host up.
   case NH_MARK_UP:
-    if (!h->available) {
+    if (!h->available.load()) {
       h->set_available();
       NH_Note("[%" PRId64 "] http parent proxy %s restored", sm_id, hostname);
     }
     break;
   // Mark the host down.
   case NH_MARK_DOWN:
-    if (h->failedAt == 0 || result.retry == true) {
+    if (h->failedAt.load() == 0 || result.retry == true) {
       { // lock guard
         std::lock_guard<std::mutex> guard(h->_mutex);
-        if (h->failedAt == 0) {
+        if (h->failedAt.load() == 0) {
           h->failedAt = _now;
           if (result.retry == false) {
             new_fail_count = h->failCount = 1;
@@ -128,7 +128,7 @@ NextHopHealthStatus::markNextHop(TSHttpTxn txn, const char *hostname, const int
       // if the last failure was outside the retry window, set the failcount to 1 and failedAt to now.
       { // lock guard
         std::lock_guard<std::mutex> lock(h->_mutex);
-        if ((h->failedAt + retry_time) < static_cast<unsigned>(_now)) {
+        if ((h->failedAt.load() + retry_time) < static_cast<unsigned>(_now)) {
           h->failCount = 1;
           h->failedAt  = _now;
         } else {
@@ -144,7 +144,7 @@ NextHopHealthStatus::markNextHop(TSHttpTxn txn, const char *hostname, const int
       NH_Note("[%" PRId64 "] Failure threshold met failcount:%d >= threshold:%" PRId64 ", http parent proxy %s marked down", sm_id,
               new_fail_count, fail_threshold, h->hostname.c_str());
       NH_Debug(NH_DEBUG_TAG, "[%" PRId64 "] NextHop %s marked unavailable, h->available=%s", sm_id, h->hostname.c_str(),
-               (h->available) ? "true" : "false");
+               (h->available.load()) ? "true" : "false");
     }
     break;
   }
diff --git a/proxy/http/remap/NextHopRoundRobin.cc b/proxy/http/remap/NextHopRoundRobin.cc
index 3b1afe8..9097a70 100644
--- a/proxy/http/remap/NextHopRoundRobin.cc
+++ b/proxy/http/remap/NextHopRoundRobin.cc
@@ -134,15 +134,15 @@ NextHopRoundRobin::findNextHop(TSHttpTxn txnp, void *ih, time_t now)
     NH_Debug(NH_DEBUG_TAG,
              "[%" PRIu64 "] Selected a parent, %s,  failCount (faileAt: %d failCount: %d), FailThreshold: %" PRIu64
              ", request_info->xact_start: %ld",
-             sm_id, cur_host->hostname.c_str(), (unsigned)cur_host->failedAt, cur_host->failCount, fail_threshold,
+             sm_id, cur_host->hostname.c_str(), (unsigned)cur_host->failedAt, cur_host->failCount.load(), fail_threshold,
              request_info.xact_start);
     // check if 'cur_host' is available, mark it up if it is.
-    if ((cur_host->failedAt == 0) || (cur_host->failCount < fail_threshold)) {
-      if (host_stat == TS_HOST_STATUS_UP) {
+    if ((cur_host->failedAt == 0) || (cur_host->failCount.load() < fail_threshold)) {
+      if (cur_host->available.load() && host_stat == TS_HOST_STATUS_UP) {
         NH_Debug(NH_DEBUG_TAG,
                  "[%" PRIu64
                  "] Selecting a parent, %s,  due to little failCount (faileAt: %d failCount: %d), FailThreshold: %" PRIu64,
-                 sm_id, cur_host->hostname.c_str(), (unsigned)cur_host->failedAt, cur_host->failCount, fail_threshold);
+                 sm_id, cur_host->hostname.c_str(), (unsigned)cur_host->failedAt, cur_host->failCount.load(), fail_threshold);
         parentUp = true;
       }
     } else { // if not available, check to see if it can be retried.  If so, set the retry flag and temporairly mark it as
@@ -150,11 +150,16 @@ NextHopRoundRobin::findNextHop(TSHttpTxn txnp, void *ih, time_t now)
       _now == 0 ? _now = time(nullptr) : _now = now;
       if (((result->wrap_around) || (cur_host->failedAt + retry_time) < static_cast<unsigned>(_now)) &&
           host_stat == TS_HOST_STATUS_UP) {
-        // Reuse the parent
-        parentUp    = true;
-        parentRetry = true;
-        NH_Debug(NH_DEBUG_TAG, "[%" PRIu64 "]  NextHop marked for retry %s:%d", sm_id, cur_host->hostname.c_str(),
-                 host_groups[cur_grp_index][cur_hst_index]->getPort(scheme));
+        if (cur_host->retriers.fetch_add(1, std::memory_order_relaxed) < max_retriers) {
+          // Reuse the parent
+          parentUp    = true;
+          parentRetry = true;
+          NH_Debug(NH_DEBUG_TAG, "[%" PRIu64 "]  NextHop marked for retry %s:%d, max_retriers: %d, retriers: %d", sm_id,
+                   cur_host->hostname.c_str(), host_groups[cur_grp_index][cur_hst_index]->getPort(scheme), max_retriers,
+                   cur_host->retriers.load());
+        } else {
+          cur_host->retriers--;
+        }
       } else { // not retryable or available.
         parentUp = false;
       }
diff --git a/proxy/http/remap/NextHopSelectionStrategy.cc b/proxy/http/remap/NextHopSelectionStrategy.cc
index e21168c..6c5b6dd 100644
--- a/proxy/http/remap/NextHopSelectionStrategy.cc
+++ b/proxy/http/remap/NextHopSelectionStrategy.cc
@@ -39,9 +39,17 @@ constexpr const char *policy_strings[] = {"NH_UNDEFINED", "NH_FIRST_LIVE", "NH_R
 
 NextHopSelectionStrategy::NextHopSelectionStrategy(const std::string_view &name, const NHPolicyType &policy)
 {
-  strategy_name = name;
-  policy_type   = policy;
-  NH_Debug(NH_DEBUG_TAG, "Using a selection strategy of type %s", policy_strings[policy]);
+  int _max_retriers = 0;
+  strategy_name     = name;
+  policy_type       = policy;
+  REC_ReadConfigInteger(_max_retriers, "proxy.config.http.parent_proxy.max_trans_retries");
+
+  // config settings may not be available when running unit tests.
+  // so use the max_retriers default setting.
+  if (_max_retriers > 0) {
+    max_retriers = _max_retriers;
+  }
+  NH_Debug(NH_DEBUG_TAG, "Using a selection strategy of type %s, max_retriers: %d", policy_strings[policy], max_retriers);
 }
 
 //
@@ -210,7 +218,7 @@ NextHopSelectionStrategy::nextHopExists(TSHttpTxn txnp, void *ih)
   for (uint32_t gg = 0; gg < groups; gg++) {
     for (auto &hh : host_groups[gg]) {
       HostRecord *p = hh.get();
-      if (p->available) {
+      if (p->available.load()) {
         NH_Debug(NH_DEBUG_TAG, "[%" PRIu64 "] found available next hop %s", sm_id, p->hostname.c_str());
         return true;
       }
diff --git a/proxy/http/remap/NextHopSelectionStrategy.h b/proxy/http/remap/NextHopSelectionStrategy.h
index 64bf489..84c7275 100644
--- a/proxy/http/remap/NextHopSelectionStrategy.h
+++ b/proxy/http/remap/NextHopSelectionStrategy.h
@@ -101,14 +101,15 @@ struct NHProtocol {
 struct HostRecord : ATSConsistentHashNode {
   std::mutex _mutex;
   std::string hostname;
-  time_t failedAt;
-  uint32_t failCount;
-  time_t upAt;
+  std::atomic<time_t> failedAt;
+  std::atomic<uint32_t> failCount;
+  std::atomic<time_t> upAt;
   float weight;
   std::string hash_string;
   int host_index;
   int group_index;
   std::vector<std::shared_ptr<NHProtocol>> protocols;
+  std::atomic<int> retriers = 0;
 
   // construct without locking the _mutex.
   HostRecord()
@@ -122,21 +123,23 @@ struct HostRecord : ATSConsistentHashNode {
     host_index  = -1;
     group_index = -1;
     available   = true;
+    retriers    = 0;
   }
 
   // copy constructor to avoid copying the _mutex.
   HostRecord(const HostRecord &o)
   {
     hostname    = o.hostname;
-    failedAt    = o.failedAt;
-    failCount   = o.failCount;
-    upAt        = o.upAt;
+    failedAt    = o.failedAt.load();
+    failCount   = o.failCount.load();
+    upAt        = o.upAt.load();
     weight      = o.weight;
     hash_string = o.hash_string;
     host_index  = -1;
     group_index = -1;
     available   = true;
     protocols   = o.protocols;
+    retriers    = o.available.load();
   }
 
   // assign without copying the _mutex.
@@ -144,14 +147,16 @@ struct HostRecord : ATSConsistentHashNode {
   operator=(const HostRecord &o)
   {
     hostname    = o.hostname;
-    failedAt    = o.failedAt;
-    upAt        = o.upAt;
+    failedAt    = o.failedAt.load();
+    failCount   = o.failCount.load();
+    upAt        = o.upAt.load();
     weight      = o.weight;
     hash_string = o.hash_string;
     host_index  = o.host_index;
     group_index = o.group_index;
     available   = o.available.load();
     protocols   = o.protocols;
+    retriers    = o.retriers.load();
     return *this;
   }
 
@@ -163,6 +168,9 @@ struct HostRecord : ATSConsistentHashNode {
       std::lock_guard<std::mutex> lock(_mutex);
       failedAt  = time(nullptr);
       available = false;
+      if (--retriers < 0) {
+        retriers = 0;
+      }
     }
   }
 
@@ -174,6 +182,7 @@ struct HostRecord : ATSConsistentHashNode {
       std::lock_guard<std::mutex> lock(_mutex);
       failedAt  = 0;
       failCount = 0;
+      retriers  = 0;
       upAt      = time(nullptr);
       available = true;
     }
@@ -250,4 +259,5 @@ public:
   uint32_t hst_index          = 0;
   uint32_t num_parents        = 0;
   uint32_t distance           = 0; // index into the strategies list.
+  int max_retriers            = 1;
 };