You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by du...@apache.org on 2021/02/19 23:06:39 UTC

[trafficserver] branch master updated: parse expiration time and reload config at time out (#7281)

This is an automated email from the ASF dual-hosted git repository.

duke8253 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git


The following commit(s) were added to refs/heads/master by this push:
     new b777c92  parse expiration time and reload config at time out (#7281)
b777c92 is described below

commit b777c92acea5b7f280c15d5b048c1d03a5b7e608
Author: Fei Deng <du...@gmail.com>
AuthorDate: Fri Feb 19 17:05:55 2021 -0600

    parse expiration time and reload config at time out (#7281)
---
 plugins/s3_auth/s3_auth.cc | 248 +++++++++++++++++++++++++++++++++++++++++----
 1 file changed, 226 insertions(+), 22 deletions(-)

diff --git a/plugins/s3_auth/s3_auth.cc b/plugins/s3_auth/s3_auth.cc
index 696fcab..5b381ab 100644
--- a/plugins/s3_auth/s3_auth.cc
+++ b/plugins/s3_auth/s3_auth.cc
@@ -37,6 +37,12 @@
 #include <openssl/sha.h>
 #include <openssl/hmac.h>
 
+#include <chrono>
+#include <atomic>
+#include <thread>
+#include <mutex>
+#include <shared_mutex>
+
 #include <ts/ts.h>
 #include <ts/remap.h>
 #include "tscore/ink_config.h"
@@ -143,7 +149,31 @@ public:
   S3Config *get(const char *fname);
 
 private:
-  std::unordered_map<std::string, std::pair<S3Config *, int>> _cache;
+  struct _ConfigData {
+    // This is incremented before and after cnf and load_time are set.
+    // Thus, an odd value indicates an update is in progress.
+    std::atomic<unsigned> update_status{0};
+
+    // A config from a file and the last time it was loaded.
+    // config should be written before load_time.  That way,
+    // if config is read after load_time, the load time will
+    // never indicate config is fresh when it isn't.
+    std::atomic<S3Config *> config;
+    std::atomic<time_t> load_time;
+
+    _ConfigData() {}
+
+    _ConfigData(S3Config *config_, time_t load_time_) : config(config_), load_time(load_time_) {}
+
+    _ConfigData(_ConfigData &&lhs)
+    {
+      update_status = lhs.update_status.load();
+      config        = lhs.config.load();
+      load_time     = lhs.load_time.load();
+    }
+  };
+
+  std::unordered_map<std::string, _ConfigData> _cache;
   static const int _ttl = 60;
 };
 
@@ -153,6 +183,7 @@ ConfigCache gConfCache;
 // One configuration setup
 //
 int event_handler(TSCont, TSEvent, void *); // Forward declaration
+int config_reloader(TSCont, TSEvent, void *);
 
 class S3Config
 {
@@ -162,6 +193,9 @@ public:
     if (get_cont) {
       _cont = TSContCreate(event_handler, nullptr);
       TSContDataSet(_cont, static_cast<void *>(this));
+
+      _conf_rld = TSContCreate(config_reloader, TSMutexCreate());
+      TSContDataSet(_conf_rld, static_cast<void *>(this));
     }
   }
 
@@ -171,6 +205,13 @@ public:
     TSfree(_secret);
     TSfree(_keyid);
     TSfree(_token);
+    TSfree(_conf_fname);
+    if (_conf_rld_act) {
+      TSActionCancel(_conf_rld_act);
+    }
+    if (_conf_rld) {
+      TSContDestroy(_conf_rld);
+    }
     if (_cont) {
       TSContDestroy(_cont);
     }
@@ -212,16 +253,19 @@ public:
   copy_changes_from(const S3Config *src)
   {
     if (src->_secret) {
+      TSfree(_secret);
       _secret     = TSstrdup(src->_secret);
       _secret_len = src->_secret_len;
     }
 
     if (src->_keyid) {
+      TSfree(_keyid);
       _keyid     = TSstrdup(src->_keyid);
       _keyid_len = src->_keyid_len;
     }
 
     if (src->_token) {
+      TSfree(_token);
       _token     = TSstrdup(src->_token);
       _token_len = src->_token_len;
     }
@@ -250,6 +294,13 @@ public:
       _region_map          = src->_region_map;
       _region_map_modified = true;
     }
+
+    _expiration = src->_expiration;
+
+    if (src->_conf_fname) {
+      TSfree(_conf_fname);
+      _conf_fname = TSstrdup(src->_conf_fname);
+    }
   }
 
   // Getters
@@ -319,6 +370,24 @@ public:
     return _region_map;
   }
 
+  long
+  expiration() const
+  {
+    return _expiration;
+  }
+
+  const char *
+  conf_fname() const
+  {
+    return _conf_fname;
+  }
+
+  int
+  incr_conf_reload_count()
+  {
+    return _conf_reload_count++;
+  }
+
   // Setters
   void
   set_secret(const char *s)
@@ -380,6 +449,25 @@ public:
     _region_map_modified = true;
   }
 
+  void
+  set_expiration(const char *s)
+  {
+    _expiration = strtol(s, nullptr, 10);
+  }
+
+  void
+  set_conf_fname(const char *s)
+  {
+    TSfree(_conf_fname);
+    _conf_fname = TSstrdup(s);
+  }
+
+  void
+  reset_conf_reload_count()
+  {
+    _conf_reload_count = 0;
+  }
+
   // Parse configs from an external file
   bool parse_config(const std::string &filename);
 
@@ -391,6 +479,18 @@ public:
     TSHttpTxnHookAdd(txnp, TS_HTTP_SEND_REQUEST_HDR_HOOK, _cont);
   }
 
+  void
+  schedule_conf_reload(long delay)
+  {
+    if (_conf_rld_act != nullptr && !TSActionDone(_conf_rld_act)) {
+      TSActionCancel(_conf_rld_act);
+    }
+    _conf_rld_act = TSContScheduleOnPool(_conf_rld, delay * 1000, TS_THREAD_POOL_NET);
+  }
+
+  std::shared_mutex reload_mutex;
+  std::atomic_bool reload_waiting = false;
+
 private:
   char *_secret            = nullptr;
   size_t _secret_len       = 0;
@@ -403,12 +503,17 @@ private:
   bool _version_modified   = false;
   bool _virt_host_modified = false;
   TSCont _cont             = nullptr;
+  TSCont _conf_rld         = nullptr;
+  TSAction _conf_rld_act   = nullptr;
   StringSet _v4includeHeaders;
   bool _v4includeHeaders_modified = false;
   StringSet _v4excludeHeaders;
   bool _v4excludeHeaders_modified = false;
   StringMap _region_map;
   bool _region_map_modified = false;
+  long _expiration          = 0;
+  char *_conf_fname         = nullptr;
+  int _conf_reload_count    = 0;
 };
 
 bool
@@ -465,6 +570,8 @@ S3Config::parse_config(const std::string &config_fname)
         set_exclude_headers(pos2 + 19);
       } else if (0 == strncasecmp(pos2, "v4-region-map=", 14)) {
         set_region_map(pos2 + 14);
+      } else if (0 == strncasecmp(pos2, "expiration=", 11)) {
+        set_expiration(pos2 + 11);
       } else {
         // ToDo: warnings?
       }
@@ -486,6 +593,8 @@ S3Config::parse_config(const std::string &config_fname)
 S3Config *
 ConfigCache::get(const char *fname)
 {
+  S3Config *s3;
+
   struct timeval tv;
 
   gettimeofday(&tv, nullptr);
@@ -496,40 +605,56 @@ ConfigCache::get(const char *fname)
   auto it = _cache.find(config_fname);
 
   if (it != _cache.end()) {
-    if (tv.tv_sec > (it->second.second + _ttl)) {
-      // Update the cached configuration file.
-      S3Config *s3 = new S3Config(false); // false == this config does not get the continuation
-
-      TSDebug(PLUGIN_NAME, "Configuration from %s is stale, reloading", config_fname.c_str());
-      it->second.second = tv.tv_sec;
-      if (s3->parse_config(config_fname)) {
-        it->second.first = s3;
+    unsigned update_status = it->second.update_status;
+    if (tv.tv_sec > (it->second.load_time + _ttl)) {
+      if (!(update_status & 1) && it->second.update_status.compare_exchange_strong(update_status, update_status + 1)) {
+        TSDebug(PLUGIN_NAME, "Configuration from %s is stale, reloading", config_fname.c_str());
+        s3 = new S3Config(false); // false == this config does not get the continuation
+
+        if (s3->parse_config(config_fname)) {
+          s3->set_conf_fname(fname);
+        } else {
+          // Failed the configuration parse... Set the cache response to nullptr
+          delete s3;
+          s3 = nullptr;
+          TSAssert(!"Configuration parsing / caching failed");
+        }
+
+        delete it->second.config;
+        it->second.config    = s3;
+        it->second.load_time = tv.tv_sec;
+
+        // Update is complete.
+        ++it->second.update_status;
       } else {
-        // Failed the configuration parse... Set the cache response to nullptr
-        delete s3;
-        it->second.first = nullptr;
+        // This thread lost the race with another thread that is also reloading
+        // the config for this file. Wait for the other thread to finish reloading.
+        while (it->second.update_status & 1) {
+          // Hopefully yielding will sleep the thread at least until the next
+          // scheduler interrupt, preventing a busy wait.
+          std::this_thread::yield();
+        }
+        s3 = it->second.config;
       }
     } else {
       TSDebug(PLUGIN_NAME, "Configuration from %s is fresh, reusing", config_fname.c_str());
+      s3 = it->second.config;
     }
-    return it->second.first;
   } else {
     // Create a new cached file.
-    S3Config *s3 = new S3Config(false); // false == this config does not get the continuation
+    s3 = new S3Config(false); // false == this config does not get the continuation
 
+    TSDebug(PLUGIN_NAME, "Parsing and caching configuration from %s, version:%d", config_fname.c_str(), s3->version());
     if (s3->parse_config(config_fname)) {
-      _cache[config_fname] = std::make_pair(s3, tv.tv_sec);
-      TSDebug(PLUGIN_NAME, "Parsing and caching configuration from %s, version:%d", config_fname.c_str(), s3->version());
+      s3->set_conf_fname(fname);
+      _cache.emplace(config_fname, _ConfigData(s3, tv.tv_sec));
     } else {
       delete s3;
-      return nullptr;
+      s3 = nullptr;
+      TSAssert(!"Configuration parsing / caching failed");
     }
-
-    return s3;
   }
-
-  TSAssert(!"Configuration parsing / caching failed");
-  return nullptr;
+  return s3;
 }
 
 ///////////////////////////////////////////////////////////////////////////////
@@ -876,6 +1001,11 @@ event_handler(TSCont cont, TSEvent event, void *edata)
   switch (event) {
   case TS_EVENT_HTTP_SEND_REQUEST_HDR:
     if (request.initialize()) {
+      while (s3->reload_waiting) {
+        std::this_thread::yield();
+      }
+
+      std::shared_lock lock(s3->reload_mutex);
       status = request.authorize(s3);
     }
 
@@ -897,6 +1027,63 @@ event_handler(TSCont cont, TSEvent event, void *edata)
   return 0;
 }
 
+// If the token has more than one hour to expire, reload is scheduled one hour before expiration.
+// If the token has less than one hour to expire, reload is scheduled 15 minutes before expiration.
+// If the token has less than 15 minutes to expire, reload is scheduled at the expiration time.
+static long
+cal_reload_delay(long time_diff)
+{
+  if (time_diff > 3600) {
+    return time_diff - 3600;
+  } else if (time_diff > 900) {
+    return time_diff - 900;
+  } else {
+    return time_diff;
+  }
+}
+
+int
+config_reloader(TSCont cont, TSEvent event, void *edata)
+{
+  TSDebug(PLUGIN_NAME, "reloading configs");
+  S3Config *s3          = static_cast<S3Config *>(TSContDataGet(cont));
+  S3Config *file_config = gConfCache.get(s3->conf_fname());
+
+  if (!file_config || !file_config->valid()) {
+    TSError("[%s] requires both shared and AWS secret configuration", PLUGIN_NAME);
+    return TS_ERROR;
+  }
+
+  s3->reload_waiting = true;
+  {
+    std::unique_lock lock(s3->reload_mutex);
+    s3->copy_changes_from(file_config);
+  }
+  s3->reload_waiting = false;
+
+  if (s3->expiration() == 0) {
+    TSDebug(PLUGIN_NAME, "disabling auto config reload");
+  } else {
+    // auto reload is scheduled to be 5 minutes before the expiration time to get some headroom
+    long time_diff = s3->expiration() -
+                     std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count();
+    if (time_diff > 0) {
+      long delay = cal_reload_delay(time_diff);
+      TSDebug(PLUGIN_NAME, "scheduling config reload with %ld seconds delay", delay);
+      s3->reset_conf_reload_count();
+      s3->schedule_conf_reload(delay);
+    } else {
+      TSDebug(PLUGIN_NAME, "config expiration time is in the past, re-checking in 1 minute");
+      if (s3->incr_conf_reload_count() == 10) {
+        TSError("[%s] tried to reload config automatically but failed, please try manual reloading the config", PLUGIN_NAME);
+      }
+      s3->schedule_conf_reload(60);
+    }
+  }
+
+  return TS_SUCCESS;
+}
+
 ///////////////////////////////////////////////////////////////////////////////
 // Initialize the plugin.
 //
@@ -1000,6 +1187,23 @@ TSRemapNewInstance(int argc, char *argv[], void **ih, char * /* errbuf ATS_UNUSE
     return TS_ERROR;
   }
 
+  if (s3->expiration() == 0) {
+    TSDebug(PLUGIN_NAME, "disabling auto config reload");
+  } else {
+    // auto reload is scheduled to be 5 minutes before the expiration time to get some headroom
+    long time_diff = s3->expiration() -
+                     std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count();
+    if (time_diff > 0) {
+      long delay = cal_reload_delay(time_diff);
+      TSDebug(PLUGIN_NAME, "scheduling config reload with %ld seconds delay", delay);
+      s3->reset_conf_reload_count();
+      s3->schedule_conf_reload(delay);
+    } else {
+      TSDebug(PLUGIN_NAME, "config expiration time is in the past, re-checking in 1 minute");
+      s3->schedule_conf_reload(60);
+    }
+  }
+
   *ih = static_cast<void *>(s3);
   TSDebug(PLUGIN_NAME, "New rule: access_key=%s, virtual_host=%s, version=%d", s3->keyid(), s3->virt_host() ? "yes" : "no",
           s3->version());