You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by am...@apache.org on 2017/11/28 14:53:46 UTC

[trafficserver] branch master updated: Move NetHandler initialization to be static and not per thread. Remove update race conditions. Related to issue #2761

This is an automated email from the ASF dual-hosted git repository.

amc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git


The following commit(s) were added to refs/heads/master by this push:
     new 69343fe  Move NetHandler initialization to be static and not per thread. Remove update race conditions. Related to issue #2761
69343fe is described below

commit 69343fe99438393caa2f852dea0a62fa3de31f54
Author: Alan M. Carroll <am...@apache.org>
AuthorDate: Sun Nov 5 14:30:22 2017 -0600

    Move NetHandler initialization to be static and not per thread.
    Remove update race conditions.
    Related to issue #2761
---
 iocore/dns/DNS.cc                |   3 +-
 iocore/net/P_UnixNet.h           |  74 ++++++++++++----
 iocore/net/UnixNet.cc            | 187 +++++++++++++++++++--------------------
 iocore/net/UnixNetProcessor.cc   |   4 +
 iocore/net/UnixNetVConnection.cc |   2 +-
 5 files changed, 154 insertions(+), 116 deletions(-)

diff --git a/iocore/dns/DNS.cc b/iocore/dns/DNS.cc
index f7e57ff..dd8ebe7 100644
--- a/iocore/dns/DNS.cc
+++ b/iocore/dns/DNS.cc
@@ -226,7 +226,8 @@ DNSProcessor::start(int, size_t stacksize)
 
   if (dns_thread > 0) {
     // TODO: Hmmm, should we just get a single thread some other way?
-    ET_DNS = eventProcessor.register_event_type("ET_DNS");
+    ET_DNS                                  = eventProcessor.register_event_type("ET_DNS");
+    NetHandler::active_thread_types[ET_DNS] = true;
     eventProcessor.schedule_spawn(&initialize_thread_for_net, ET_DNS);
     eventProcessor.spawn_event_threads(ET_DNS, 1, stacksize);
   } else {
diff --git a/iocore/net/P_UnixNet.h b/iocore/net/P_UnixNet.h
index 07ebcd1..6c69bd3 100644
--- a/iocore/net/P_UnixNet.h
+++ b/iocore/net/P_UnixNet.h
@@ -24,6 +24,8 @@
 #ifndef __P_UNIXNET_H__
 #define __P_UNIXNET_H__
 
+#include <bitset>
+
 #include "ts/ink_platform.h"
 
 #define USE_EDGE_TRIGGER_EPOLL 1
@@ -111,6 +113,7 @@ struct EventIO {
 #include "P_DNSConnection.h"
 #include "P_UnixUDPConnection.h"
 #include "P_UnixPollDescriptor.h"
+#include <limits>
 
 class UnixNetVConnection;
 class NetHandler;
@@ -223,11 +226,12 @@ struct PollCont : public Continuation {
 //
 class NetHandler : public Continuation, public EThread::LoopTailHandler
 {
+  using self_type = NetHandler; ///< Self reference type.
 public:
   // @a thread and @a trigger_event are redundant - you can get the former from the latter.
   // If we don't get rid of @a trigger_event we should remove @a thread.
-  EThread *thread = nullptr;
-  Event *trigger_event;
+  EThread *thread      = nullptr;
+  Event *trigger_event = nullptr;
   QueM(UnixNetVConnection, NetState, read, ready_link) read_ready_list;
   QueM(UnixNetVConnection, NetState, write, ready_link) write_ready_list;
   Que(UnixNetVConnection, link) open_list;
@@ -235,21 +239,50 @@ public:
   ASLLM(UnixNetVConnection, NetState, read, enable_link) read_enable_list;
   ASLLM(UnixNetVConnection, NetState, write, enable_link) write_enable_list;
   Que(UnixNetVConnection, keep_alive_queue_link) keep_alive_queue;
-  uint32_t keep_alive_queue_size;
+  uint32_t keep_alive_queue_size = 0;
   Que(UnixNetVConnection, active_queue_link) active_queue;
-  uint32_t active_queue_size;
-  uint32_t max_connections_per_thread_in;
-  uint32_t max_connections_active_per_thread_in;
-
-  // configuration settings for managing the active and keep-alive queues
-  uint32_t max_connections_in;
-  uint32_t max_connections_active_in;
-  uint32_t inactive_threashold_in;
-  uint32_t transaction_no_activity_timeout_in;
-  uint32_t keep_alive_no_activity_timeout_in;
-  uint32_t default_inactivity_timeout;
-
-  int startNetEvent(int event, Event *data);
+  uint32_t active_queue_size = 0;
+
+  /// configuration settings for managing the active and keep-alive queues
+  struct Config {
+    uint32_t max_connections_in                 = 0;
+    uint32_t max_connections_active_in          = 0;
+    uint32_t inactive_threshold_in              = 0;
+    uint32_t transaction_no_activity_timeout_in = 0;
+    uint32_t keep_alive_no_activity_timeout_in  = 0;
+    uint32_t default_inactivity_timeout         = 0;
+
+    /** Return the address of the first value in this struct.
+
+        Doing updates is much easier if we treat this config struct as an array.
+        Making it a method means the knowledge of which member is the first one
+        is localized to this struct, not scattered about.
+     */
+    uint32_t &operator[](int n) { return *(&max_connections_in + n); }
+  };
+  /** Static global config, set and updated per process.
+
+      This is updated asynchronously and then events are sent to the NetHandler instances per thread
+      to copy to the per thread config at a convenient time. Because these are updated independently
+      from the command line, the update events just copy a single value from the global to the
+      local. This mechanism relies on members being identical types.
+  */
+  static Config global_config;
+  Config config; ///< Per thread copy of the @c global_config
+  // Active and keep alive queue values that depend on other configuration values.
+  // These are never updated directly, they are computed from other config values.
+  uint32_t max_connections_per_thread_in        = 0;
+  uint32_t max_connections_active_per_thread_in = 0;
+  /// Number of configuration items in @c Config.
+  static constexpr int CONFIG_ITEM_COUNT = sizeof(Config) / sizeof(uint32_t);
+  /// Which members of @c Config the per thread values depend on.
+  /// If one of these is updated, the per thread values must also be updated.
+  static const std::bitset<CONFIG_ITEM_COUNT> config_value_affects_per_thread_value;
+  /// Set of thread types in which nethandlers are active.
+  /// This enables signaling the correct instances when the configuration is updated.
+  /// Event type threads that use @c NetHandler must set the corresponding bit.
+  static std::bitset<std::numeric_limits<unsigned int>::digits> active_thread_types;
+
   int mainNetEvent(int event, Event *data);
   int waitForActivity(ink_hrtime timeout);
   void process_enabled_list();
@@ -260,7 +293,11 @@ public:
   void remove_from_keep_alive_queue(UnixNetVConnection *vc);
   bool add_to_active_queue(UnixNetVConnection *vc);
   void remove_from_active_queue(UnixNetVConnection *vc);
-  void configure_per_thread();
+
+  /// Per process initialization logic.
+  static void init_for_process();
+  /// Update configuration values that are per thread and depend on other configuration values.
+  void configure_per_thread_values();
 
   /**
     Start to handle read & write event on a UnixNetVConnection.
@@ -315,6 +352,9 @@ public:
 private:
   void _close_vc(UnixNetVConnection *vc, ink_hrtime now, int &handle_event, int &closed, int &total_idle_time,
                  int &total_idle_count);
+
+  /// Static method used as the callbackf for runtime configuration updates.
+  static int update_nethandler_config(const char *name, RecDataT, RecData data, void *);
 };
 
 static inline NetHandler *
diff --git a/iocore/net/UnixNet.cc b/iocore/net/UnixNet.cc
index 831c21f..2fe0e3f 100644
--- a/iocore/net/UnixNet.cc
+++ b/iocore/net/UnixNet.cc
@@ -32,6 +32,10 @@ int fds_throttle;
 int fds_limit = 8000;
 ink_hrtime last_transient_accept_error;
 
+NetHandler::Config NetHandler::global_config;
+std::bitset<std::numeric_limits<unsigned int>::digits> NetHandler::active_thread_types;
+const std::bitset<NetHandler::CONFIG_ITEM_COUNT> NetHandler::config_value_affects_per_thread_value{0x3};
+
 extern "C" void fd_reify(struct ev_loop *);
 
 // INKqa10496
@@ -217,9 +221,6 @@ initialize_thread_for_net(EThread *thread)
   nh->mutex  = new_ProxyMutex();
   nh->thread = thread;
 
-  // This is needed to initialize NetHandler
-  thread->schedule_imm(nh);
-
   PollCont *pc       = get_PollCont(thread);
   PollDescriptor *pd = pc->pollDescriptor;
 
@@ -227,6 +228,8 @@ initialize_thread_for_net(EThread *thread)
   int cop_freq                 = 1;
 
   REC_ReadConfigInteger(cop_freq, "proxy.config.net.inactivity_check_frequency");
+  memcpy(&nh->config, &NetHandler::global_config, sizeof(NetHandler::global_config));
+  nh->configure_per_thread_values();
   thread->schedule_every(inactivityCop, HRTIME_SECONDS(cop_freq));
 
   thread->set_tail_handler(nh);
@@ -241,65 +244,86 @@ initialize_thread_for_net(EThread *thread)
 
 // NetHandler method definitions
 
-NetHandler::NetHandler()
-  : Continuation(nullptr),
-    trigger_event(nullptr),
-    keep_alive_queue_size(0),
-    active_queue_size(0),
-    max_connections_per_thread_in(0),
-    max_connections_active_per_thread_in(0),
-    max_connections_in(0),
-    max_connections_active_in(0),
-    inactive_threashold_in(0),
-    transaction_no_activity_timeout_in(0),
-    keep_alive_no_activity_timeout_in(0),
-    default_inactivity_timeout(0)
+NetHandler::NetHandler() : Continuation(nullptr)
 {
-  SET_HANDLER((NetContHandler)&NetHandler::startNetEvent);
+  SET_HANDLER((NetContHandler)&NetHandler::mainNetEvent);
 }
 
 int
-update_nethandler_config(const char *name, RecDataT data_type ATS_UNUSED, RecData data, void *cookie)
+NetHandler::update_nethandler_config(const char *str, RecDataT, RecData data, void *)
 {
-  NetHandler *nh = static_cast<NetHandler *>(cookie);
-  ink_assert(nh != nullptr);
-  bool update_per_thread_configuration = false;
-
-  if (nh != nullptr) {
-    if (strcmp(name, "proxy.config.net.max_connections_in") == 0) {
-      Debug("net_queue", "proxy.config.net.max_connections_in updated to %" PRId64, data.rec_int);
-      nh->max_connections_in          = data.rec_int;
-      update_per_thread_configuration = true;
-    }
-    if (strcmp(name, "proxy.config.net.max_active_connections_in") == 0) {
-      Debug("net_queue", "proxy.config.net.max_active_connections_in updated to %" PRId64, data.rec_int);
-      nh->max_connections_active_in   = data.rec_int;
-      update_per_thread_configuration = true;
-    }
-    if (strcmp(name, "proxy.config.net.inactive_threashold_in") == 0) {
-      Debug("net_queue", "proxy.config.net.inactive_threashold_in updated to %" PRId64, data.rec_int);
-      nh->inactive_threashold_in = data.rec_int;
-    }
-    if (strcmp(name, "proxy.config.net.transaction_no_activity_timeout_in") == 0) {
-      Debug("net_queue", "proxy.config.net.transaction_no_activity_timeout_in updated to %" PRId64, data.rec_int);
-      nh->transaction_no_activity_timeout_in = data.rec_int;
-    }
-    if (strcmp(name, "proxy.config.net.keep_alive_no_activity_timeout_in") == 0) {
-      Debug("net_queue", "proxy.config.net.keep_alive_no_activity_timeout_in updated to %" PRId64, data.rec_int);
-      nh->keep_alive_no_activity_timeout_in = data.rec_int;
-    }
-    if (strcmp(name, "proxy.config.net.default_inactivity_timeout") == 0) {
-      Debug("net_queue", "proxy.config.net.default_inactivity_timeout updated to %" PRId64, data.rec_int);
-      nh->default_inactivity_timeout = data.rec_int;
-    }
+  uint32_t *updated_member = nullptr; // direct pointer to config member for update.
+  ts::string_view name{str};
+
+  if (name == "proxy.config.net.max_connections_in"_sv) {
+    updated_member = &NetHandler::global_config.max_connections_in;
+    Debug("net_queue", "proxy.config.net.max_connections_in updated to %" PRId64, data.rec_int);
+  } else if (name == "proxy.config.net.max_active_connections_in"_sv) {
+    updated_member = &NetHandler::global_config.max_connections_active_in;
+    Debug("net_queue", "proxy.config.net.max_active_connections_in updated to %" PRId64, data.rec_int);
+  } else if (name == "proxy.config.net.inactive_threshold_in"_sv) {
+    updated_member = &NetHandler::global_config.inactive_threshold_in;
+    Debug("net_queue", "proxy.config.net.inactive_threshold_in updated to %" PRId64, data.rec_int);
+  } else if (name == "proxy.config.net.transaction_no_activity_timeout_in"_sv) {
+    updated_member = &NetHandler::global_config.transaction_no_activity_timeout_in;
+    Debug("net_queue", "proxy.config.net.transaction_no_activity_timeout_in updated to %" PRId64, data.rec_int);
+  } else if (name == "proxy.config.net.keep_alive_no_activity_timeout_in"_sv) {
+    updated_member = &NetHandler::global_config.keep_alive_no_activity_timeout_in;
+    Debug("net_queue", "proxy.config.net.keep_alive_no_activity_timeout_in updated to %" PRId64, data.rec_int);
+  } else if (name == "proxy.config.net.default_inactivity_timeout"_sv) {
+    updated_member = &NetHandler::global_config.default_inactivity_timeout;
+    Debug("net_queue", "proxy.config.net.default_inactivity_timeout updated to %" PRId64, data.rec_int);
   }
 
-  if (update_per_thread_configuration == true) {
-    nh->configure_per_thread();
+  if (updated_member) {
+    *updated_member = data.rec_int; // do the actual update.
+    // portable form of the update, an index converted to <void*> so it can be passed as an event cookie.
+    void *idx = reinterpret_cast<void *>(static_cast<intptr_t>(updated_member - &global_config[0]));
+    // Signal the NetHandler instances, passing the index of the updated config value.
+    for (int i = 0; i < eventProcessor.n_thread_groups; ++i) {
+      if (!active_thread_types[i])
+        continue;
+      for (EThread **tp    = eventProcessor.thread_group[i]._thread,
+                   **limit = eventProcessor.thread_group[i]._thread + eventProcessor.thread_group[i]._count;
+           tp < limit; ++tp) {
+        NetHandler *nh = get_NetHandler(*tp);
+        if (nh)
+          nh->thread->schedule_imm(nh, TS_EVENT_MGMT_UPDATE, idx);
+      }
+    }
   }
 
   return REC_ERR_OKAY;
 }
+
+void
+NetHandler::init_for_process()
+{
+  // read configuration values and setup callbacks for when they change
+  REC_ReadConfigInt32(global_config.max_connections_in, "proxy.config.net.max_connections_in");
+  REC_ReadConfigInt32(global_config.max_connections_active_in, "proxy.config.net.max_connections_active_in");
+  REC_ReadConfigInt32(global_config.inactive_threshold_in, "proxy.config.net.inactive_threshold_in");
+  REC_ReadConfigInt32(global_config.transaction_no_activity_timeout_in, "proxy.config.net.transaction_no_activity_timeout_in");
+  REC_ReadConfigInt32(global_config.keep_alive_no_activity_timeout_in, "proxy.config.net.keep_alive_no_activity_timeout_in");
+  REC_ReadConfigInt32(global_config.default_inactivity_timeout, "proxy.config.net.default_inactivity_timeout");
+
+  RecRegisterConfigUpdateCb("proxy.config.net.max_connections_in", update_nethandler_config, nullptr);
+  RecRegisterConfigUpdateCb("proxy.config.net.max_active_connections_in", update_nethandler_config, nullptr);
+  RecRegisterConfigUpdateCb("proxy.config.net.inactive_threshold_in", update_nethandler_config, nullptr);
+  RecRegisterConfigUpdateCb("proxy.config.net.transaction_no_activity_timeout_in", update_nethandler_config, nullptr);
+  RecRegisterConfigUpdateCb("proxy.config.net.keep_alive_no_activity_timeout_in", update_nethandler_config, nullptr);
+  RecRegisterConfigUpdateCb("proxy.config.net.default_inactivity_timeout", update_nethandler_config, nullptr);
+
+  Debug("net_queue", "proxy.config.net.max_connections_in updated to %d", global_config.max_connections_in);
+  Debug("net_queue", "proxy.config.net.max_active_connections_in updated to %d", global_config.max_connections_active_in);
+  Debug("net_queue", "proxy.config.net.inactive_threshold_in updated to %d", global_config.inactive_threshold_in);
+  Debug("net_queue", "proxy.config.net.transaction_no_activity_timeout_in updated to %d",
+        global_config.transaction_no_activity_timeout_in);
+  Debug("net_queue", "proxy.config.net.keep_alive_no_activity_timeout_in updated to %d",
+        global_config.keep_alive_no_activity_timeout_in);
+  Debug("net_queue", "proxy.config.net.default_inactivity_timeout updated to %d", global_config.default_inactivity_timeout);
+}
+
 //
 // Function used to release a UnixNetVConnection and free it.
 //
@@ -321,42 +345,6 @@ NetHandler::free_netvc(UnixNetVConnection *netvc)
 }
 
 //
-// Initialization here, in the thread in which we will be executing
-// from now on.
-//
-int
-NetHandler::startNetEvent(int event, Event *e)
-{
-  // read configuration values and setup callbacks for when they change
-  REC_ReadConfigInt32(max_connections_in, "proxy.config.net.max_connections_in");
-  REC_ReadConfigInt32(max_connections_active_in, "proxy.config.net.max_connections_active_in");
-  REC_ReadConfigInt32(inactive_threashold_in, "proxy.config.net.inactive_threashold_in");
-  REC_ReadConfigInt32(transaction_no_activity_timeout_in, "proxy.config.net.transaction_no_activity_timeout_in");
-  REC_ReadConfigInt32(keep_alive_no_activity_timeout_in, "proxy.config.net.keep_alive_no_activity_timeout_in");
-  REC_ReadConfigInt32(default_inactivity_timeout, "proxy.config.net.default_inactivity_timeout");
-
-  RecRegisterConfigUpdateCb("proxy.config.net.max_connections_in", update_nethandler_config, (void *)this);
-  RecRegisterConfigUpdateCb("proxy.config.net.max_active_connections_in", update_nethandler_config, (void *)this);
-  RecRegisterConfigUpdateCb("proxy.config.net.inactive_threashold_in", update_nethandler_config, (void *)this);
-  RecRegisterConfigUpdateCb("proxy.config.net.transaction_no_activity_timeout_in", update_nethandler_config, (void *)this);
-  RecRegisterConfigUpdateCb("proxy.config.net.keep_alive_no_activity_timeout_in", update_nethandler_config, (void *)this);
-  RecRegisterConfigUpdateCb("proxy.config.net.default_inactivity_timeout", update_nethandler_config, (void *)this);
-
-  Debug("net_queue", "proxy.config.net.max_connections_in updated to %d", max_connections_in);
-  Debug("net_queue", "proxy.config.net.max_active_connections_in updated to %d", max_connections_active_in);
-  Debug("net_queue", "proxy.config.net.inactive_threashold_in updated to %d", inactive_threashold_in);
-  Debug("net_queue", "proxy.config.net.transaction_no_activity_timeout_in updated to %d", transaction_no_activity_timeout_in);
-  Debug("net_queue", "proxy.config.net.keep_alive_no_activity_timeout_in updated to %d", keep_alive_no_activity_timeout_in);
-  Debug("net_queue", "proxy.config.net.default_inactivity_timeout updated to %d", default_inactivity_timeout);
-
-  configure_per_thread();
-
-  (void)event;
-
-  return EVENT_CONT;
-}
-
-//
 // Move VC's enabled on a different thread to the ready list
 //
 void
@@ -451,18 +439,23 @@ NetHandler::process_ready_list()
   }
 #endif /* !USE_EDGE_TRIGGER */
 }
+
 //
 // The main event for NetHandler
-// This is called every proxy.config.net.event_period, and handles all IO operations scheduled
-// for this period.
-//
 int
 NetHandler::mainNetEvent(int event, Event *e)
 {
-  ink_assert(trigger_event == e && (event == EVENT_INTERVAL || event == EVENT_POLL));
-  (void)event;
-  (void)e;
-  return this->waitForActivity(-1);
+  if (TS_EVENT_MGMT_UPDATE == event) {
+    intptr_t idx = reinterpret_cast<intptr_t>(e->cookie);
+    // Copy to the same offset in the instance struct.
+    config[idx] = global_config[idx];
+    if (config_value_affects_per_thread_value[idx])
+      this->configure_per_thread_values();
+    return EVENT_CONT;
+  } else {
+    ink_assert(trigger_event == e && (event == EVENT_INTERVAL || event == EVENT_POLL));
+    return this->waitForActivity(-1);
+  }
 }
 
 int
@@ -589,12 +582,12 @@ NetHandler::manage_active_queue(bool ignore_queue_size = false)
 }
 
 void
-NetHandler::configure_per_thread()
+NetHandler::configure_per_thread_values()
 {
   // figure out the number of threads and calculate the number of connections per thread
   int threads                          = eventProcessor.thread_group[ET_NET]._count;
-  max_connections_per_thread_in        = max_connections_in / threads;
-  max_connections_active_per_thread_in = max_connections_active_in / threads;
+  max_connections_per_thread_in        = config.max_connections_in / threads;
+  max_connections_active_per_thread_in = config.max_connections_active_in / threads;
   Debug("net_queue", "max_connections_per_thread_in updated to %d threads: %d", max_connections_per_thread_in, threads);
   Debug("net_queue", "max_connections_active_per_thread_in updated to %d threads: %d", max_connections_active_per_thread_in,
         threads);
diff --git a/iocore/net/UnixNetProcessor.cc b/iocore/net/UnixNetProcessor.cc
index f4fd335..bba56e5 100644
--- a/iocore/net/UnixNetProcessor.cc
+++ b/iocore/net/UnixNetProcessor.cc
@@ -419,6 +419,10 @@ UnixNetProcessor::init()
   if (0 == accept_mss)
     REC_ReadConfigInteger(accept_mss, "proxy.config.net.sock_mss_in");
 
+  // NetHandler - do the global configuration initialization and then
+  // schedule per thread start up logic. Global init is done only here.
+  NetHandler::init_for_process();
+  NetHandler::active_thread_types[ET_NET] = true;
   eventProcessor.schedule_spawn(&initialize_thread_for_net, etype);
 
   RecData d;
diff --git a/iocore/net/UnixNetVConnection.cc b/iocore/net/UnixNetVConnection.cc
index cd5036e..9dc3be4 100644
--- a/iocore/net/UnixNetVConnection.cc
+++ b/iocore/net/UnixNetVConnection.cc
@@ -1396,7 +1396,7 @@ UnixNetVConnection::set_inactivity_timeout(ink_hrtime timeout_in)
   Debug("socket", "Set inactive timeout=%" PRId64 ", for NetVC=%p", timeout_in, this);
   if (timeout_in == 0) {
     // set default inactivity timeout
-    timeout_in = HRTIME_SECONDS(nh->default_inactivity_timeout);
+    timeout_in = HRTIME_SECONDS(nh->config.default_inactivity_timeout);
   }
   inactivity_timeout_in      = timeout_in;
   next_inactivity_timeout_at = Thread::get_hrtime() + inactivity_timeout_in;

-- 
To stop receiving notification emails like this one, please contact
['"commits@trafficserver.apache.org" <co...@trafficserver.apache.org>'].