You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by am...@apache.org on 2017/11/28 14:53:46 UTC
[trafficserver] branch master updated: Move NetHandler
initialization to be static and not per thread. Remove update race
conditions. Related to issue #2761
This is an automated email from the ASF dual-hosted git repository.
amc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git
The following commit(s) were added to refs/heads/master by this push:
new 69343fe Move NetHandler initialization to be static and not per thread. Remove update race conditions. Related to issue #2761
69343fe is described below
commit 69343fe99438393caa2f852dea0a62fa3de31f54
Author: Alan M. Carroll <am...@apache.org>
AuthorDate: Sun Nov 5 14:30:22 2017 -0600
Move NetHandler initialization to be static and not per thread.
Remove update race conditions.
Related to issue #2761
---
iocore/dns/DNS.cc | 3 +-
iocore/net/P_UnixNet.h | 74 ++++++++++++----
iocore/net/UnixNet.cc | 187 +++++++++++++++++++--------------------
iocore/net/UnixNetProcessor.cc | 4 +
iocore/net/UnixNetVConnection.cc | 2 +-
5 files changed, 154 insertions(+), 116 deletions(-)
diff --git a/iocore/dns/DNS.cc b/iocore/dns/DNS.cc
index f7e57ff..dd8ebe7 100644
--- a/iocore/dns/DNS.cc
+++ b/iocore/dns/DNS.cc
@@ -226,7 +226,8 @@ DNSProcessor::start(int, size_t stacksize)
if (dns_thread > 0) {
// TODO: Hmmm, should we just get a single thread some other way?
- ET_DNS = eventProcessor.register_event_type("ET_DNS");
+ ET_DNS = eventProcessor.register_event_type("ET_DNS");
+ NetHandler::active_thread_types[ET_DNS] = true;
eventProcessor.schedule_spawn(&initialize_thread_for_net, ET_DNS);
eventProcessor.spawn_event_threads(ET_DNS, 1, stacksize);
} else {
diff --git a/iocore/net/P_UnixNet.h b/iocore/net/P_UnixNet.h
index 07ebcd1..6c69bd3 100644
--- a/iocore/net/P_UnixNet.h
+++ b/iocore/net/P_UnixNet.h
@@ -24,6 +24,8 @@
#ifndef __P_UNIXNET_H__
#define __P_UNIXNET_H__
+#include <bitset>
+
#include "ts/ink_platform.h"
#define USE_EDGE_TRIGGER_EPOLL 1
@@ -111,6 +113,7 @@ struct EventIO {
#include "P_DNSConnection.h"
#include "P_UnixUDPConnection.h"
#include "P_UnixPollDescriptor.h"
+#include <limits>
class UnixNetVConnection;
class NetHandler;
@@ -223,11 +226,12 @@ struct PollCont : public Continuation {
//
class NetHandler : public Continuation, public EThread::LoopTailHandler
{
+ using self_type = NetHandler; ///< Self reference type.
public:
// @a thread and @a trigger_event are redundant - you can get the former from the latter.
// If we don't get rid of @a trigger_event we should remove @a thread.
- EThread *thread = nullptr;
- Event *trigger_event;
+ EThread *thread = nullptr;
+ Event *trigger_event = nullptr;
QueM(UnixNetVConnection, NetState, read, ready_link) read_ready_list;
QueM(UnixNetVConnection, NetState, write, ready_link) write_ready_list;
Que(UnixNetVConnection, link) open_list;
@@ -235,21 +239,50 @@ public:
ASLLM(UnixNetVConnection, NetState, read, enable_link) read_enable_list;
ASLLM(UnixNetVConnection, NetState, write, enable_link) write_enable_list;
Que(UnixNetVConnection, keep_alive_queue_link) keep_alive_queue;
- uint32_t keep_alive_queue_size;
+ uint32_t keep_alive_queue_size = 0;
Que(UnixNetVConnection, active_queue_link) active_queue;
- uint32_t active_queue_size;
- uint32_t max_connections_per_thread_in;
- uint32_t max_connections_active_per_thread_in;
-
- // configuration settings for managing the active and keep-alive queues
- uint32_t max_connections_in;
- uint32_t max_connections_active_in;
- uint32_t inactive_threashold_in;
- uint32_t transaction_no_activity_timeout_in;
- uint32_t keep_alive_no_activity_timeout_in;
- uint32_t default_inactivity_timeout;
-
- int startNetEvent(int event, Event *data);
+ uint32_t active_queue_size = 0;
+
+ /// configuration settings for managing the active and keep-alive queues
+ struct Config {
+ uint32_t max_connections_in = 0;
+ uint32_t max_connections_active_in = 0;
+ uint32_t inactive_threshold_in = 0;
+ uint32_t transaction_no_activity_timeout_in = 0;
+ uint32_t keep_alive_no_activity_timeout_in = 0;
+ uint32_t default_inactivity_timeout = 0;
+
+ /** Return the address of the first value in this struct.
+
+ Doing updates is much easier if we treat this config struct as an array.
+ Making it a method means the knowledge of which member is the first one
+ is localized to this struct, not scattered about.
+ */
+ uint32_t &operator[](int n) { return *(&max_connections_in + n); }
+ };
+ /** Static global config, set and updated per process.
+
+ This is updated asynchronously and then events are sent to the NetHandler instances per thread
+ to copy to the per thread config at a convenient time. Because these are updated independently
+ from the command line, the update events just copy a single value from the global to the
+ local. This mechanism relies on members being identical types.
+ */
+ static Config global_config;
+ Config config; ///< Per thread copy of the @c global_config
+ // Active and keep alive queue values that depend on other configuration values.
+ // These are never updated directly, they are computed from other config values.
+ uint32_t max_connections_per_thread_in = 0;
+ uint32_t max_connections_active_per_thread_in = 0;
+ /// Number of configuration items in @c Config.
+ static constexpr int CONFIG_ITEM_COUNT = sizeof(Config) / sizeof(uint32_t);
+ /// Which members of @c Config the per thread values depend on.
+ /// If one of these is updated, the per thread values must also be updated.
+ static const std::bitset<CONFIG_ITEM_COUNT> config_value_affects_per_thread_value;
+ /// Set of thread types in which nethandlers are active.
+ /// This enables signaling the correct instances when the configuration is updated.
+ /// Event type threads that use @c NetHandler must set the corresponding bit.
+ static std::bitset<std::numeric_limits<unsigned int>::digits> active_thread_types;
+
int mainNetEvent(int event, Event *data);
int waitForActivity(ink_hrtime timeout);
void process_enabled_list();
@@ -260,7 +293,11 @@ public:
void remove_from_keep_alive_queue(UnixNetVConnection *vc);
bool add_to_active_queue(UnixNetVConnection *vc);
void remove_from_active_queue(UnixNetVConnection *vc);
- void configure_per_thread();
+
+ /// Per process initialization logic.
+ static void init_for_process();
+ /// Update configuration values that are per thread and depend on other configuration values.
+ void configure_per_thread_values();
/**
Start to handle read & write event on a UnixNetVConnection.
@@ -315,6 +352,9 @@ public:
private:
void _close_vc(UnixNetVConnection *vc, ink_hrtime now, int &handle_event, int &closed, int &total_idle_time,
int &total_idle_count);
+
+ /// Static method used as the callbackf for runtime configuration updates.
+ static int update_nethandler_config(const char *name, RecDataT, RecData data, void *);
};
static inline NetHandler *
diff --git a/iocore/net/UnixNet.cc b/iocore/net/UnixNet.cc
index 831c21f..2fe0e3f 100644
--- a/iocore/net/UnixNet.cc
+++ b/iocore/net/UnixNet.cc
@@ -32,6 +32,10 @@ int fds_throttle;
int fds_limit = 8000;
ink_hrtime last_transient_accept_error;
+NetHandler::Config NetHandler::global_config;
+std::bitset<std::numeric_limits<unsigned int>::digits> NetHandler::active_thread_types;
+const std::bitset<NetHandler::CONFIG_ITEM_COUNT> NetHandler::config_value_affects_per_thread_value{0x3};
+
extern "C" void fd_reify(struct ev_loop *);
// INKqa10496
@@ -217,9 +221,6 @@ initialize_thread_for_net(EThread *thread)
nh->mutex = new_ProxyMutex();
nh->thread = thread;
- // This is needed to initialize NetHandler
- thread->schedule_imm(nh);
-
PollCont *pc = get_PollCont(thread);
PollDescriptor *pd = pc->pollDescriptor;
@@ -227,6 +228,8 @@ initialize_thread_for_net(EThread *thread)
int cop_freq = 1;
REC_ReadConfigInteger(cop_freq, "proxy.config.net.inactivity_check_frequency");
+ memcpy(&nh->config, &NetHandler::global_config, sizeof(NetHandler::global_config));
+ nh->configure_per_thread_values();
thread->schedule_every(inactivityCop, HRTIME_SECONDS(cop_freq));
thread->set_tail_handler(nh);
@@ -241,65 +244,86 @@ initialize_thread_for_net(EThread *thread)
// NetHandler method definitions
-NetHandler::NetHandler()
- : Continuation(nullptr),
- trigger_event(nullptr),
- keep_alive_queue_size(0),
- active_queue_size(0),
- max_connections_per_thread_in(0),
- max_connections_active_per_thread_in(0),
- max_connections_in(0),
- max_connections_active_in(0),
- inactive_threashold_in(0),
- transaction_no_activity_timeout_in(0),
- keep_alive_no_activity_timeout_in(0),
- default_inactivity_timeout(0)
+NetHandler::NetHandler() : Continuation(nullptr)
{
- SET_HANDLER((NetContHandler)&NetHandler::startNetEvent);
+ SET_HANDLER((NetContHandler)&NetHandler::mainNetEvent);
}
int
-update_nethandler_config(const char *name, RecDataT data_type ATS_UNUSED, RecData data, void *cookie)
+NetHandler::update_nethandler_config(const char *str, RecDataT, RecData data, void *)
{
- NetHandler *nh = static_cast<NetHandler *>(cookie);
- ink_assert(nh != nullptr);
- bool update_per_thread_configuration = false;
-
- if (nh != nullptr) {
- if (strcmp(name, "proxy.config.net.max_connections_in") == 0) {
- Debug("net_queue", "proxy.config.net.max_connections_in updated to %" PRId64, data.rec_int);
- nh->max_connections_in = data.rec_int;
- update_per_thread_configuration = true;
- }
- if (strcmp(name, "proxy.config.net.max_active_connections_in") == 0) {
- Debug("net_queue", "proxy.config.net.max_active_connections_in updated to %" PRId64, data.rec_int);
- nh->max_connections_active_in = data.rec_int;
- update_per_thread_configuration = true;
- }
- if (strcmp(name, "proxy.config.net.inactive_threashold_in") == 0) {
- Debug("net_queue", "proxy.config.net.inactive_threashold_in updated to %" PRId64, data.rec_int);
- nh->inactive_threashold_in = data.rec_int;
- }
- if (strcmp(name, "proxy.config.net.transaction_no_activity_timeout_in") == 0) {
- Debug("net_queue", "proxy.config.net.transaction_no_activity_timeout_in updated to %" PRId64, data.rec_int);
- nh->transaction_no_activity_timeout_in = data.rec_int;
- }
- if (strcmp(name, "proxy.config.net.keep_alive_no_activity_timeout_in") == 0) {
- Debug("net_queue", "proxy.config.net.keep_alive_no_activity_timeout_in updated to %" PRId64, data.rec_int);
- nh->keep_alive_no_activity_timeout_in = data.rec_int;
- }
- if (strcmp(name, "proxy.config.net.default_inactivity_timeout") == 0) {
- Debug("net_queue", "proxy.config.net.default_inactivity_timeout updated to %" PRId64, data.rec_int);
- nh->default_inactivity_timeout = data.rec_int;
- }
+ uint32_t *updated_member = nullptr; // direct pointer to config member for update.
+ ts::string_view name{str};
+
+ if (name == "proxy.config.net.max_connections_in"_sv) {
+ updated_member = &NetHandler::global_config.max_connections_in;
+ Debug("net_queue", "proxy.config.net.max_connections_in updated to %" PRId64, data.rec_int);
+ } else if (name == "proxy.config.net.max_active_connections_in"_sv) {
+ updated_member = &NetHandler::global_config.max_connections_active_in;
+ Debug("net_queue", "proxy.config.net.max_active_connections_in updated to %" PRId64, data.rec_int);
+ } else if (name == "proxy.config.net.inactive_threshold_in"_sv) {
+ updated_member = &NetHandler::global_config.inactive_threshold_in;
+ Debug("net_queue", "proxy.config.net.inactive_threshold_in updated to %" PRId64, data.rec_int);
+ } else if (name == "proxy.config.net.transaction_no_activity_timeout_in"_sv) {
+ updated_member = &NetHandler::global_config.transaction_no_activity_timeout_in;
+ Debug("net_queue", "proxy.config.net.transaction_no_activity_timeout_in updated to %" PRId64, data.rec_int);
+ } else if (name == "proxy.config.net.keep_alive_no_activity_timeout_in"_sv) {
+ updated_member = &NetHandler::global_config.keep_alive_no_activity_timeout_in;
+ Debug("net_queue", "proxy.config.net.keep_alive_no_activity_timeout_in updated to %" PRId64, data.rec_int);
+ } else if (name == "proxy.config.net.default_inactivity_timeout"_sv) {
+ updated_member = &NetHandler::global_config.default_inactivity_timeout;
+ Debug("net_queue", "proxy.config.net.default_inactivity_timeout updated to %" PRId64, data.rec_int);
}
- if (update_per_thread_configuration == true) {
- nh->configure_per_thread();
+ if (updated_member) {
+ *updated_member = data.rec_int; // do the actual update.
+ // portable form of the update, an index converted to <void*> so it can be passed as an event cookie.
+ void *idx = reinterpret_cast<void *>(static_cast<intptr_t>(updated_member - &global_config[0]));
+ // Signal the NetHandler instances, passing the index of the updated config value.
+ for (int i = 0; i < eventProcessor.n_thread_groups; ++i) {
+ if (!active_thread_types[i])
+ continue;
+ for (EThread **tp = eventProcessor.thread_group[i]._thread,
+ **limit = eventProcessor.thread_group[i]._thread + eventProcessor.thread_group[i]._count;
+ tp < limit; ++tp) {
+ NetHandler *nh = get_NetHandler(*tp);
+ if (nh)
+ nh->thread->schedule_imm(nh, TS_EVENT_MGMT_UPDATE, idx);
+ }
+ }
}
return REC_ERR_OKAY;
}
+
+void
+NetHandler::init_for_process()
+{
+ // read configuration values and setup callbacks for when they change
+ REC_ReadConfigInt32(global_config.max_connections_in, "proxy.config.net.max_connections_in");
+ REC_ReadConfigInt32(global_config.max_connections_active_in, "proxy.config.net.max_connections_active_in");
+ REC_ReadConfigInt32(global_config.inactive_threshold_in, "proxy.config.net.inactive_threshold_in");
+ REC_ReadConfigInt32(global_config.transaction_no_activity_timeout_in, "proxy.config.net.transaction_no_activity_timeout_in");
+ REC_ReadConfigInt32(global_config.keep_alive_no_activity_timeout_in, "proxy.config.net.keep_alive_no_activity_timeout_in");
+ REC_ReadConfigInt32(global_config.default_inactivity_timeout, "proxy.config.net.default_inactivity_timeout");
+
+ RecRegisterConfigUpdateCb("proxy.config.net.max_connections_in", update_nethandler_config, nullptr);
+ RecRegisterConfigUpdateCb("proxy.config.net.max_active_connections_in", update_nethandler_config, nullptr);
+ RecRegisterConfigUpdateCb("proxy.config.net.inactive_threshold_in", update_nethandler_config, nullptr);
+ RecRegisterConfigUpdateCb("proxy.config.net.transaction_no_activity_timeout_in", update_nethandler_config, nullptr);
+ RecRegisterConfigUpdateCb("proxy.config.net.keep_alive_no_activity_timeout_in", update_nethandler_config, nullptr);
+ RecRegisterConfigUpdateCb("proxy.config.net.default_inactivity_timeout", update_nethandler_config, nullptr);
+
+ Debug("net_queue", "proxy.config.net.max_connections_in updated to %d", global_config.max_connections_in);
+ Debug("net_queue", "proxy.config.net.max_active_connections_in updated to %d", global_config.max_connections_active_in);
+ Debug("net_queue", "proxy.config.net.inactive_threshold_in updated to %d", global_config.inactive_threshold_in);
+ Debug("net_queue", "proxy.config.net.transaction_no_activity_timeout_in updated to %d",
+ global_config.transaction_no_activity_timeout_in);
+ Debug("net_queue", "proxy.config.net.keep_alive_no_activity_timeout_in updated to %d",
+ global_config.keep_alive_no_activity_timeout_in);
+ Debug("net_queue", "proxy.config.net.default_inactivity_timeout updated to %d", global_config.default_inactivity_timeout);
+}
+
//
// Function used to release a UnixNetVConnection and free it.
//
@@ -321,42 +345,6 @@ NetHandler::free_netvc(UnixNetVConnection *netvc)
}
//
-// Initialization here, in the thread in which we will be executing
-// from now on.
-//
-int
-NetHandler::startNetEvent(int event, Event *e)
-{
- // read configuration values and setup callbacks for when they change
- REC_ReadConfigInt32(max_connections_in, "proxy.config.net.max_connections_in");
- REC_ReadConfigInt32(max_connections_active_in, "proxy.config.net.max_connections_active_in");
- REC_ReadConfigInt32(inactive_threashold_in, "proxy.config.net.inactive_threashold_in");
- REC_ReadConfigInt32(transaction_no_activity_timeout_in, "proxy.config.net.transaction_no_activity_timeout_in");
- REC_ReadConfigInt32(keep_alive_no_activity_timeout_in, "proxy.config.net.keep_alive_no_activity_timeout_in");
- REC_ReadConfigInt32(default_inactivity_timeout, "proxy.config.net.default_inactivity_timeout");
-
- RecRegisterConfigUpdateCb("proxy.config.net.max_connections_in", update_nethandler_config, (void *)this);
- RecRegisterConfigUpdateCb("proxy.config.net.max_active_connections_in", update_nethandler_config, (void *)this);
- RecRegisterConfigUpdateCb("proxy.config.net.inactive_threashold_in", update_nethandler_config, (void *)this);
- RecRegisterConfigUpdateCb("proxy.config.net.transaction_no_activity_timeout_in", update_nethandler_config, (void *)this);
- RecRegisterConfigUpdateCb("proxy.config.net.keep_alive_no_activity_timeout_in", update_nethandler_config, (void *)this);
- RecRegisterConfigUpdateCb("proxy.config.net.default_inactivity_timeout", update_nethandler_config, (void *)this);
-
- Debug("net_queue", "proxy.config.net.max_connections_in updated to %d", max_connections_in);
- Debug("net_queue", "proxy.config.net.max_active_connections_in updated to %d", max_connections_active_in);
- Debug("net_queue", "proxy.config.net.inactive_threashold_in updated to %d", inactive_threashold_in);
- Debug("net_queue", "proxy.config.net.transaction_no_activity_timeout_in updated to %d", transaction_no_activity_timeout_in);
- Debug("net_queue", "proxy.config.net.keep_alive_no_activity_timeout_in updated to %d", keep_alive_no_activity_timeout_in);
- Debug("net_queue", "proxy.config.net.default_inactivity_timeout updated to %d", default_inactivity_timeout);
-
- configure_per_thread();
-
- (void)event;
-
- return EVENT_CONT;
-}
-
-//
// Move VC's enabled on a different thread to the ready list
//
void
@@ -451,18 +439,23 @@ NetHandler::process_ready_list()
}
#endif /* !USE_EDGE_TRIGGER */
}
+
//
// The main event for NetHandler
-// This is called every proxy.config.net.event_period, and handles all IO operations scheduled
-// for this period.
-//
int
NetHandler::mainNetEvent(int event, Event *e)
{
- ink_assert(trigger_event == e && (event == EVENT_INTERVAL || event == EVENT_POLL));
- (void)event;
- (void)e;
- return this->waitForActivity(-1);
+ if (TS_EVENT_MGMT_UPDATE == event) {
+ intptr_t idx = reinterpret_cast<intptr_t>(e->cookie);
+ // Copy to the same offset in the instance struct.
+ config[idx] = global_config[idx];
+ if (config_value_affects_per_thread_value[idx])
+ this->configure_per_thread_values();
+ return EVENT_CONT;
+ } else {
+ ink_assert(trigger_event == e && (event == EVENT_INTERVAL || event == EVENT_POLL));
+ return this->waitForActivity(-1);
+ }
}
int
@@ -589,12 +582,12 @@ NetHandler::manage_active_queue(bool ignore_queue_size = false)
}
void
-NetHandler::configure_per_thread()
+NetHandler::configure_per_thread_values()
{
// figure out the number of threads and calculate the number of connections per thread
int threads = eventProcessor.thread_group[ET_NET]._count;
- max_connections_per_thread_in = max_connections_in / threads;
- max_connections_active_per_thread_in = max_connections_active_in / threads;
+ max_connections_per_thread_in = config.max_connections_in / threads;
+ max_connections_active_per_thread_in = config.max_connections_active_in / threads;
Debug("net_queue", "max_connections_per_thread_in updated to %d threads: %d", max_connections_per_thread_in, threads);
Debug("net_queue", "max_connections_active_per_thread_in updated to %d threads: %d", max_connections_active_per_thread_in,
threads);
diff --git a/iocore/net/UnixNetProcessor.cc b/iocore/net/UnixNetProcessor.cc
index f4fd335..bba56e5 100644
--- a/iocore/net/UnixNetProcessor.cc
+++ b/iocore/net/UnixNetProcessor.cc
@@ -419,6 +419,10 @@ UnixNetProcessor::init()
if (0 == accept_mss)
REC_ReadConfigInteger(accept_mss, "proxy.config.net.sock_mss_in");
+ // NetHandler - do the global configuration initialization and then
+ // schedule per thread start up logic. Global init is done only here.
+ NetHandler::init_for_process();
+ NetHandler::active_thread_types[ET_NET] = true;
eventProcessor.schedule_spawn(&initialize_thread_for_net, etype);
RecData d;
diff --git a/iocore/net/UnixNetVConnection.cc b/iocore/net/UnixNetVConnection.cc
index cd5036e..9dc3be4 100644
--- a/iocore/net/UnixNetVConnection.cc
+++ b/iocore/net/UnixNetVConnection.cc
@@ -1396,7 +1396,7 @@ UnixNetVConnection::set_inactivity_timeout(ink_hrtime timeout_in)
Debug("socket", "Set inactive timeout=%" PRId64 ", for NetVC=%p", timeout_in, this);
if (timeout_in == 0) {
// set default inactivity timeout
- timeout_in = HRTIME_SECONDS(nh->default_inactivity_timeout);
+ timeout_in = HRTIME_SECONDS(nh->config.default_inactivity_timeout);
}
inactivity_timeout_in = timeout_in;
next_inactivity_timeout_at = Thread::get_hrtime() + inactivity_timeout_in;
--
To stop receiving notification emails like this one, please contact
['"commits@trafficserver.apache.org" <co...@trafficserver.apache.org>'].