You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by bc...@apache.org on 2015/06/19 04:56:04 UTC
trafficserver git commit: TS-3313: Added active queue for incoming
connections
Repository: trafficserver
Updated Branches:
refs/heads/master bec6dd64a -> 974e8e3ab
TS-3313: Added active queue for incoming connections
Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo
Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/974e8e3a
Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/974e8e3a
Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/974e8e3a
Branch: refs/heads/master
Commit: 974e8e3abc9a3f60f516b24302edde5618a3cfc5
Parents: bec6dd6
Author: Bryan Call <bc...@apache.org>
Authored: Thu Jun 18 19:55:29 2015 -0700
Committer: Bryan Call <bc...@apache.org>
Committed: Thu Jun 18 19:55:29 2015 -0700
----------------------------------------------------------------------
iocore/net/I_NetVConnection.h | 6 +-
iocore/net/Net.cc | 8 +-
iocore/net/P_Net.h | 4 +-
iocore/net/P_UnixNet.h | 26 ++-
iocore/net/P_UnixNetVConnection.h | 32 ++-
iocore/net/UnixConnection.cc | 31 +--
iocore/net/UnixNet.cc | 354 +++++++++++++++++++++++----------
iocore/net/UnixNetVConnection.cc | 33 +--
mgmt/RecordsConfig.cc | 4 +-
proxy/PluginVC.cc | 11 +-
proxy/PluginVC.h | 5 +-
proxy/http/HttpClientSession.cc | 10 +-
proxy/spdy/SpdyCallbacks.cc | 2 +-
proxy/spdy/SpdyClientSession.cc | 2 +-
proxy/spdy/SpdyClientSession.h | 2 +-
15 files changed, 367 insertions(+), 163 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/974e8e3a/iocore/net/I_NetVConnection.h
----------------------------------------------------------------------
diff --git a/iocore/net/I_NetVConnection.h b/iocore/net/I_NetVConnection.h
index 5509a35..e9697be 100644
--- a/iocore/net/I_NetVConnection.h
+++ b/iocore/net/I_NetVConnection.h
@@ -429,9 +429,11 @@ public:
*/
virtual void cancel_inactivity_timeout() = 0;
- virtual void add_to_keep_alive_lru() = 0;
+ virtual void add_to_keep_alive_queue() = 0;
- virtual void remove_from_keep_alive_lru() = 0;
+ virtual void remove_from_keep_alive_queue() = 0;
+
+ virtual bool add_to_active_queue() = 0;
/** @return the current active_timeout value in nanosecs */
virtual ink_hrtime get_active_timeout() = 0;
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/974e8e3a/iocore/net/Net.cc
----------------------------------------------------------------------
diff --git a/iocore/net/Net.cc b/iocore/net/Net.cc
index 5c8acc3..fbdf03d 100644
--- a/iocore/net/Net.cc
+++ b/iocore/net/Net.cc
@@ -125,12 +125,12 @@ register_net_stats()
(int)inactivity_cop_lock_acquire_failure_stat, RecRawStatSyncSum);
RecRegisterRawStat(net_rsb, RECT_PROCESS, "proxy.process.net.dynamic_keep_alive_timeout_in_total", RECD_INT, RECP_NON_PERSISTENT,
- (int)keep_alive_lru_timeout_total_stat, RecRawStatSyncSum);
- NET_CLEAR_DYN_STAT(keep_alive_lru_timeout_total_stat);
+ (int)keep_alive_queue_timeout_total_stat, RecRawStatSyncSum);
+ NET_CLEAR_DYN_STAT(keep_alive_queue_timeout_total_stat);
RecRegisterRawStat(net_rsb, RECT_PROCESS, "proxy.process.net.dynamic_keep_alive_timeout_in_count", RECD_INT, RECP_NON_PERSISTENT,
- (int)keep_alive_lru_timeout_count_stat, RecRawStatSyncSum);
- NET_CLEAR_DYN_STAT(keep_alive_lru_timeout_count_stat);
+ (int)keep_alive_queue_timeout_count_stat, RecRawStatSyncSum);
+ NET_CLEAR_DYN_STAT(keep_alive_queue_timeout_count_stat);
RecRegisterRawStat(net_rsb, RECT_PROCESS, "proxy.process.net.default_inactivity_timeout_applied", RECD_INT, RECP_NON_PERSISTENT,
(int)default_inactivity_timeout_stat, RecRawStatSyncSum);
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/974e8e3a/iocore/net/P_Net.h
----------------------------------------------------------------------
diff --git a/iocore/net/P_Net.h b/iocore/net/P_Net.h
index c80365f..78aac91 100644
--- a/iocore/net/P_Net.h
+++ b/iocore/net/P_Net.h
@@ -50,8 +50,8 @@ enum Net_Stats {
socks_connections_unsuccessful_stat,
socks_connections_currently_open_stat,
inactivity_cop_lock_acquire_failure_stat,
- keep_alive_lru_timeout_total_stat,
- keep_alive_lru_timeout_count_stat,
+ keep_alive_queue_timeout_total_stat,
+ keep_alive_queue_timeout_count_stat,
default_inactivity_timeout_stat,
Net_Stat_Count
};
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/974e8e3a/iocore/net/P_UnixNet.h
----------------------------------------------------------------------
diff --git a/iocore/net/P_UnixNet.h b/iocore/net/P_UnixNet.h
index 112c357..c3794e6 100644
--- a/iocore/net/P_UnixNet.h
+++ b/iocore/net/P_UnixNet.h
@@ -185,8 +185,19 @@ public:
DList(UnixNetVConnection, cop_link) cop_list;
ASLLM(UnixNetVConnection, NetState, read, enable_link) read_enable_list;
ASLLM(UnixNetVConnection, NetState, write, enable_link) write_enable_list;
- Que(UnixNetVConnection, keep_alive_link) keep_alive_list;
- uint32_t keep_alive_lru_size;
+ Que(UnixNetVConnection, keep_alive_queue_link) keep_alive_queue;
+ uint32_t keep_alive_queue_size;
+ Que(UnixNetVConnection, active_queue_link) active_queue;
+ uint32_t active_queue_size;
+ uint32_t max_connections_per_thread_in;
+ uint32_t max_connections_active_per_thread_in;
+
+ // configuration settings for managing the active and keep-alive queues
+ uint32_t max_connections_in;
+ uint32_t max_connections_active_in;
+ uint32_t inactive_threashold_in;
+ uint32_t transaction_no_activity_timeout_in;
+ uint32_t keep_alive_no_activity_timeout_in;
time_t sec;
int cycles;
@@ -195,8 +206,19 @@ public:
int mainNetEvent(int event, Event *data);
int mainNetEventExt(int event, Event *data);
void process_enabled_list(NetHandler *);
+ void manage_keep_alive_queue();
+ bool manage_active_queue();
+ void add_to_keep_alive_queue(UnixNetVConnection *vc);
+ void remove_from_keep_alive_queue(UnixNetVConnection *vc);
+ bool add_to_active_queue(UnixNetVConnection *vc);
+ void remove_from_active_queue(UnixNetVConnection *vc);
+ void configure_per_thread();
NetHandler();
+
+private:
+ void _close_vc(UnixNetVConnection *vc, ink_hrtime now, int &handle_event, int &closed, int &total_idle_time,
+ int &total_idle_count);
};
static inline NetHandler *
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/974e8e3a/iocore/net/P_UnixNetVConnection.h
----------------------------------------------------------------------
diff --git a/iocore/net/P_UnixNetVConnection.h b/iocore/net/P_UnixNetVConnection.h
index c8ce1eb..61cf925 100644
--- a/iocore/net/P_UnixNetVConnection.h
+++ b/iocore/net/P_UnixNetVConnection.h
@@ -149,8 +149,10 @@ public:
virtual void set_inactivity_timeout(ink_hrtime timeout_in);
virtual void cancel_active_timeout();
virtual void cancel_inactivity_timeout();
- virtual void add_to_keep_alive_lru();
- virtual void remove_from_keep_alive_lru();
+ virtual void add_to_keep_alive_queue();
+ virtual void remove_from_keep_alive_queue();
+ virtual bool add_to_active_queue();
+ virtual void remove_from_active_queue();
// The public interface is VIO::reenable()
virtual void reenable(VIO *vio);
@@ -225,17 +227,19 @@ public:
SLINKM(UnixNetVConnection, read, enable_link)
LINKM(UnixNetVConnection, write, ready_link)
SLINKM(UnixNetVConnection, write, enable_link)
- LINK(UnixNetVConnection, keep_alive_link);
+ LINK(UnixNetVConnection, keep_alive_queue_link);
+ LINK(UnixNetVConnection, active_queue_link);
ink_hrtime inactivity_timeout_in;
ink_hrtime active_timeout_in;
#ifdef INACTIVITY_TIMEOUT
Event *inactivity_timeout;
+ Event *activity_timeout;
#else
ink_hrtime next_inactivity_timeout_at;
+ ink_hrtime next_activity_timeout_at;
#endif
- Event *active_timeout;
EventIO ep;
NetHandler *nh;
unsigned int id;
@@ -310,9 +314,8 @@ UnixNetVConnection::set_inactivity_timeout(ink_hrtime timeout)
{
Debug("socket", "Set inactive timeout=%" PRId64 ", for NetVC=%p", timeout, this);
inactivity_timeout_in = timeout;
-#ifndef INACTIVITY_TIMEOUT
- next_inactivity_timeout_at = ink_get_hrtime() + timeout;
-#else
+#ifdef INACTIVITY_TIMEOUT
+
if (inactivity_timeout)
inactivity_timeout->cancel_action(this);
if (inactivity_timeout_in) {
@@ -332,6 +335,8 @@ UnixNetVConnection::set_inactivity_timeout(ink_hrtime timeout)
inactivity_timeout = 0;
} else
inactivity_timeout = 0;
+#else
+ next_inactivity_timeout_at = ink_get_hrtime() + timeout;
#endif
}
@@ -340,6 +345,7 @@ UnixNetVConnection::set_active_timeout(ink_hrtime timeout)
{
Debug("socket", "Set active timeout=%" PRId64 ", NetVC=%p", timeout, this);
active_timeout_in = timeout;
+#ifdef INACTIVITY_TIMEOUT
if (active_timeout)
active_timeout->cancel_action(this);
if (active_timeout_in) {
@@ -359,11 +365,15 @@ UnixNetVConnection::set_active_timeout(ink_hrtime timeout)
active_timeout = 0;
} else
active_timeout = 0;
+#else
+ next_activity_timeout_at = ink_get_hrtime() + timeout;
+#endif
}
TS_INLINE void
UnixNetVConnection::cancel_inactivity_timeout()
{
+ Debug("socket", "Cancel inactive timeout for NetVC=%p", this);
inactivity_timeout_in = 0;
#ifdef INACTIVITY_TIMEOUT
if (inactivity_timeout) {
@@ -372,7 +382,6 @@ UnixNetVConnection::cancel_inactivity_timeout()
inactivity_timeout = NULL;
}
#else
- Debug("socket", "Cancel inactive timeout for NetVC=%p", this);
next_inactivity_timeout_at = 0;
#endif
}
@@ -380,12 +389,17 @@ UnixNetVConnection::cancel_inactivity_timeout()
TS_INLINE void
UnixNetVConnection::cancel_active_timeout()
{
+ Debug("socket", "Cancel active timeout for NetVC=%p", this);
+ active_timeout_in = 0;
+#ifdef INACTIVITY_TIMEOUT
if (active_timeout) {
Debug("socket", "Cancel active timeout for NetVC=%p", this);
active_timeout->cancel_action(this);
active_timeout = NULL;
- active_timeout_in = 0;
}
+#else
+ next_activity_timeout_at = 0;
+#endif
}
TS_INLINE int
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/974e8e3a/iocore/net/UnixConnection.cc
----------------------------------------------------------------------
diff --git a/iocore/net/UnixConnection.cc b/iocore/net/UnixConnection.cc
index 303175a..bcf560e 100644
--- a/iocore/net/UnixConnection.cc
+++ b/iocore/net/UnixConnection.cc
@@ -392,24 +392,25 @@ Connection::apply_options(NetVCOptions const &opt)
}
void
-UnixNetVConnection::add_to_keep_alive_lru()
+UnixNetVConnection::add_to_keep_alive_queue()
{
- Debug("socket", "UnixNetVConnection::add_to_keep_alive_lru NetVC=%p", this);
- if (nh->keep_alive_list.in(this)) {
- nh->keep_alive_list.remove(this);
- nh->keep_alive_list.enqueue(this);
- } else {
- nh->keep_alive_list.enqueue(this);
- ++nh->keep_alive_lru_size;
- }
+ nh->add_to_keep_alive_queue(this);
}
void
-UnixNetVConnection::remove_from_keep_alive_lru()
+UnixNetVConnection::remove_from_keep_alive_queue()
{
- Debug("socket", "UnixNetVConnection::remove_from_keep_alive_lru NetVC=%p", this);
- if (nh->keep_alive_list.in(this)) {
- nh->keep_alive_list.remove(this);
- --nh->keep_alive_lru_size;
- }
+ nh->remove_from_keep_alive_queue(this);
+}
+
+bool
+UnixNetVConnection::add_to_active_queue()
+{
+ return nh->add_to_active_queue(this);
+}
+
+void
+UnixNetVConnection::remove_from_active_queue()
+{
+ nh->remove_from_active_queue(this);
}
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/974e8e3a/iocore/net/UnixNet.cc
----------------------------------------------------------------------
diff --git a/iocore/net/UnixNet.cc b/iocore/net/UnixNet.cc
index ff2f0d6..214ab12 100644
--- a/iocore/net/UnixNet.cc
+++ b/iocore/net/UnixNet.cc
@@ -43,15 +43,12 @@ int update_cop_config(const char *name, RecDataT data_type, RecData data, void *
class InactivityCop : public Continuation
{
public:
- InactivityCop(ProxyMutex *m)
- : Continuation(m), default_inactivity_timeout(0), total_connections_in(0), max_connections_in(0), connections_per_thread_in(0)
+ InactivityCop(ProxyMutex *m) : Continuation(m), default_inactivity_timeout(0)
{
SET_HANDLER(&InactivityCop::check_inactivity);
REC_ReadConfigInteger(default_inactivity_timeout, "proxy.config.net.default_inactivity_timeout");
Debug("inactivity_cop", "default inactivity timeout is set to: %d", default_inactivity_timeout);
- REC_ReadConfigInt32(max_connections_in, "proxy.config.net.max_connections_in");
- RecRegisterConfigUpdateCb("proxy.config.net.max_connections_in", update_cop_config, (void *)this);
RecRegisterConfigUpdateCb("proxy.config.net.default_inactivity_timeout", update_cop_config, (void *)this);
}
@@ -63,14 +60,10 @@ public:
NetHandler &nh = *get_NetHandler(this_ethread());
Debug("inactivity_cop_check", "Checking inactivity on Thread-ID #%d", this_ethread()->id);
- total_connections_in = 0;
// Copy the list and use pop() to catch any closes caused by callbacks.
forl_LL(UnixNetVConnection, vc, nh.open_list)
{
if (vc->thread == this_ethread()) {
- if (vc->from_accept_thread == true) {
- ++total_connections_in;
- }
nh.cop_list.push(vc);
}
}
@@ -98,11 +91,11 @@ public:
}
if (vc->next_inactivity_timeout_at && vc->next_inactivity_timeout_at < now) {
- if (nh.keep_alive_list.in(vc)) {
+ if (nh.keep_alive_queue.in(vc)) {
// only stat if the connection is in keep-alive, there can be other inactivity timeouts
ink_hrtime diff = (now - (vc->next_inactivity_timeout_at - vc->inactivity_timeout_in)) / HRTIME_SECOND;
- NET_SUM_DYN_STAT(keep_alive_lru_timeout_total_stat, diff);
- NET_INCREMENT_DYN_STAT(keep_alive_lru_timeout_count_stat);
+ NET_SUM_DYN_STAT(keep_alive_queue_timeout_total_stat, diff);
+ NET_INCREMENT_DYN_STAT(keep_alive_queue_timeout_count_stat);
}
Debug("inactivity_cop_verbose", "vc: %p now: %" PRId64 " timeout at: %" PRId64 " timeout in: %" PRId64, vc, now,
vc->next_inactivity_timeout_at, vc->inactivity_timeout_in);
@@ -110,34 +103,21 @@ public:
}
}
- // Keep-alive LRU for incoming connections
- keep_alive_lru(nh, now, e);
+ // Cleanup the active and keep-alive queues periodically
+ nh.manage_active_queue();
+ nh.manage_keep_alive_queue();
return 0;
}
void
- set_max_connections(const int32_t x)
- {
- max_connections_in = x;
- }
- void
- set_connections_per_thread(const int32_t x)
- {
- connections_per_thread_in = x;
- }
- void
set_default_timeout(const int x)
{
default_inactivity_timeout = x;
}
private:
- void keep_alive_lru(NetHandler &nh, ink_hrtime now, Event *e);
int default_inactivity_timeout; // only used when one is not set for some bad reason
- int32_t total_connections_in;
- int32_t max_connections_in;
- int32_t connections_per_thread_in;
};
int
@@ -147,12 +127,6 @@ update_cop_config(const char *name, RecDataT data_type ATS_UNUSED, RecData data,
ink_assert(cop != NULL);
if (cop != NULL) {
- if (strcmp(name, "proxy.config.net.max_connections_in") == 0) {
- Debug("inactivity_cop_dynamic", "proxy.config.net.max_connections_in updated to %" PRId64, data.rec_int);
- cop->set_max_connections(data.rec_int);
- cop->set_connections_per_thread(0);
- }
-
if (strcmp(name, "proxy.config.net.default_inactivity_timeout") == 0) {
Debug("inactivity_cop_dynamic", "proxy.config.net.default_inactivity_timeout updated to %" PRId64, data.rec_int);
cop->set_default_timeout(data.rec_int);
@@ -162,77 +136,6 @@ update_cop_config(const char *name, RecDataT data_type ATS_UNUSED, RecData data,
return REC_ERR_OKAY;
}
-void
-InactivityCop::keep_alive_lru(NetHandler &nh, const ink_hrtime now, Event *e)
-{
- // maximum incoming connections is set to 0 then the feature is disabled
- if (max_connections_in == 0) {
- return;
- }
-
- if (connections_per_thread_in == 0) {
- // figure out the number of threads and calculate the number of connections per thread
- const int event_threads = eventProcessor.n_threads_for_type[ET_NET];
- const int ssl_threads = (ET_NET == SSLNetProcessor::ET_SSL) ? 0 : eventProcessor.n_threads_for_type[SSLNetProcessor::ET_SSL];
- connections_per_thread_in = max_connections_in / (event_threads + ssl_threads);
- }
-
- // calculate how many connections to close
- int32_t to_process = total_connections_in - connections_per_thread_in;
- if (to_process <= 0) {
- return;
- }
- to_process = min((int32_t)nh.keep_alive_lru_size, to_process);
-
- Debug("inactivity_cop_dynamic", "max cons: %d active: %d idle: %d process: %d"
- " net type: %d ssl type: %d",
- connections_per_thread_in, total_connections_in - nh.keep_alive_lru_size, nh.keep_alive_lru_size, to_process, ET_NET,
- SSLNetProcessor::ET_SSL);
-
- // loop over the non-active connections and try to close them
- UnixNetVConnection *vc = nh.keep_alive_list.head;
- UnixNetVConnection *vc_next = NULL;
- int closed = 0;
- int handle_event = 0;
- int total_idle_time = 0;
- int total_idle_count = 0;
- for (int32_t i = 0; i < to_process && vc != NULL; ++i, vc = vc_next) {
- vc_next = vc->keep_alive_link.next;
- if (vc->thread != this_ethread()) {
- continue;
- }
- MUTEX_TRY_LOCK(lock, vc->mutex, this_ethread());
- if (!lock.is_locked()) {
- continue;
- }
- ink_hrtime diff = (now - (vc->next_inactivity_timeout_at - vc->inactivity_timeout_in)) / HRTIME_SECOND;
- if (diff > 0) {
- total_idle_time += diff;
- ++total_idle_count;
- NET_SUM_DYN_STAT(keep_alive_lru_timeout_total_stat, diff);
- NET_INCREMENT_DYN_STAT(keep_alive_lru_timeout_count_stat);
- }
- Debug("inactivity_cop_dynamic",
- "closing connection NetVC=%p idle: %u now: %" PRId64 " at: %" PRId64 " in: %" PRId64 " diff: %" PRId64, vc,
- nh.keep_alive_lru_size, ink_hrtime_to_sec(now), ink_hrtime_to_sec(vc->next_inactivity_timeout_at),
- ink_hrtime_to_sec(vc->inactivity_timeout_in), diff);
- if (vc->closed) {
- close_UnixNetVConnection(vc, e->ethread);
- ++closed;
- } else {
- vc->next_inactivity_timeout_at = now;
- nh.keep_alive_list.head->handleEvent(EVENT_IMMEDIATE, e);
- ++handle_event;
- }
- }
-
- if (total_idle_count > 0) {
- Debug("inactivity_cop_dynamic", "max cons: %d active: %d idle: %d already closed: %d, close event: %d"
- " mean idle: %d\n",
- connections_per_thread_in, total_connections_in - nh.keep_alive_lru_size - closed - handle_event, nh.keep_alive_lru_size,
- closed, handle_event, total_idle_time / total_idle_count);
- }
-}
#endif
PollCont::PollCont(ProxyMutex *m, int pt) : Continuation(m), net_handler(NULL), nextPollDescriptor(NULL), poll_timeout(pt)
@@ -387,11 +290,52 @@ initialize_thread_for_net(EThread *thread)
// NetHandler method definitions
-NetHandler::NetHandler() : Continuation(NULL), trigger_event(0), keep_alive_lru_size(0)
+NetHandler::NetHandler() : Continuation(NULL), trigger_event(0), keep_alive_queue_size(0), active_queue_size(0)
{
SET_HANDLER((NetContHandler)&NetHandler::startNetEvent);
}
+
+int
+update_nethandler_config(const char *name, RecDataT data_type ATS_UNUSED, RecData data, void *cookie)
+{
+ NetHandler *nh = static_cast<NetHandler *>(cookie);
+ ink_assert(nh != NULL);
+ bool update_per_thread_configuration = false;
+
+ if (nh != NULL) {
+ if (strcmp(name, "proxy.config.net.max_connections_in") == 0) {
+ Debug("net_queue", "proxy.config.net.max_connections_in updated to %" PRId64, data.rec_int);
+ nh->max_connections_in = data.rec_int;
+ update_per_thread_configuration = true;
+ }
+ if (strcmp(name, "proxy.config.net.max_active_connections_in") == 0) {
+ Debug("net_queue", "proxy.config.net.max_active_connections_in updated to %" PRId64, data.rec_int);
+ nh->max_connections_active_in = data.rec_int;
+ update_per_thread_configuration = true;
+ }
+ if (strcmp(name, "proxy.config.net.inactive_threashold_in") == 0) {
+ Debug("net_queue", "proxy.config.net.inactive_threashold_in updated to %" PRId64, data.rec_int);
+ nh->inactive_threashold_in = data.rec_int;
+ }
+ if (strcmp(name, "proxy.config.net.transaction_no_activity_timeout_in") == 0) {
+ Debug("net_queue", "proxy.config.net.transaction_no_activity_timeout_in updated to %" PRId64, data.rec_int);
+ nh->transaction_no_activity_timeout_in = data.rec_int;
+ }
+ if (strcmp(name, "proxy.config.net.keep_alive_no_activity_timeout_in") == 0) {
+ Debug("net_queue", "proxy.config.net.keep_alive_no_activity_timeout_in updated to %" PRId64, data.rec_int);
+ nh->keep_alive_no_activity_timeout_in = data.rec_int;
+ }
+ }
+
+ if (update_per_thread_configuration == true) {
+ nh->configure_per_thread();
+ }
+
+ return REC_ERR_OKAY;
+}
+
+
//
// Initialization here, in the thread in which we will be executing
// from now on.
@@ -399,6 +343,27 @@ NetHandler::NetHandler() : Continuation(NULL), trigger_event(0), keep_alive_lru_
int
NetHandler::startNetEvent(int event, Event *e)
{
+ // read configuration values and setup callbacks for when they change
+ REC_ReadConfigInt32(max_connections_in, "proxy.config.net.max_connections_in");
+ REC_ReadConfigInt32(max_connections_active_in, "proxy.config.net.max_connections_active_in");
+ REC_ReadConfigInt32(inactive_threashold_in, "proxy.config.net.inactive_threashold_in");
+ REC_ReadConfigInt32(transaction_no_activity_timeout_in, "proxy.config.net.transaction_no_activity_timeout_in");
+ REC_ReadConfigInt32(keep_alive_no_activity_timeout_in, "proxy.config.net.keep_alive_no_activity_timeout_in");
+
+ RecRegisterConfigUpdateCb("proxy.config.net.max_connections_in", update_nethandler_config, (void *)this);
+ RecRegisterConfigUpdateCb("proxy.config.net.max_active_connections_in", update_nethandler_config, (void *)this);
+ RecRegisterConfigUpdateCb("proxy.config.net.inactive_threashold_in", update_nethandler_config, (void *)this);
+ RecRegisterConfigUpdateCb("proxy.config.net.transaction_no_activity_timeout_in", update_nethandler_config, (void *)this);
+ RecRegisterConfigUpdateCb("proxy.config.net.keep_alive_no_activity_timeout_in", update_nethandler_config, (void *)this);
+
+ Debug("net_queue", "proxy.config.net.max_connections_in updated to %d", max_connections_in);
+ Debug("net_queue", "proxy.config.net.max_active_connections_in updated to %d", max_connections_active_in);
+ Debug("net_queue", "proxy.config.net.inactive_threashold_in updated to %d", inactive_threashold_in);
+ Debug("net_queue", "proxy.config.net.transaction_no_activity_timeout_in updated to %d", transaction_no_activity_timeout_in);
+ Debug("net_queue", "proxy.config.net.keep_alive_no_activity_timeout_in updated to %d", keep_alive_no_activity_timeout_in);
+
+ configure_per_thread();
+
(void)event;
SET_HANDLER((NetContHandler)&NetHandler::mainNetEvent);
e->schedule_every(-HRTIME_MSECONDS(net_event_period));
@@ -597,3 +562,182 @@ NetHandler::mainNetEvent(int event, Event *e)
return EVENT_CONT;
}
+
+bool
+NetHandler::manage_active_queue()
+{
+ const int total_connections_in = active_queue_size + keep_alive_queue_size;
+ Debug("net_queue", "max_connections_per_thread_in: %d max_connections_active_per_thread_in: %d total_connections_in: %d "
+ "active_queue_size: %d keep_alive_queue_size: %d",
+ max_connections_per_thread_in, max_connections_active_per_thread_in, total_connections_in, active_queue_size,
+ keep_alive_queue_size);
+
+ if (max_connections_active_per_thread_in > active_queue_size) {
+ return true;
+ }
+
+ ink_hrtime now = ink_get_hrtime();
+
+ // loop over the non-active connections and try to close them
+ UnixNetVConnection *vc = active_queue.head;
+ UnixNetVConnection *vc_next = NULL;
+ int closed = 0;
+ int handle_event = 0;
+ int total_idle_time = 0;
+ int total_idle_count = 0;
+ for (; vc != NULL; vc = vc_next) {
+ if ((vc->next_inactivity_timeout_at > now) || (vc->next_activity_timeout_at > now)) {
+ _close_vc(vc, now, handle_event, closed, total_idle_time, total_idle_count);
+ }
+ if (max_connections_active_per_thread_in > active_queue_size) {
+ return true;
+ }
+ }
+
+ return false; // failed to make room in the queue, all connections are active
+}
+
+void
+NetHandler::configure_per_thread()
+{
+ // figure out the number of threads and calculate the number of connections per thread
+ int threads = eventProcessor.n_threads_for_type[ET_NET];
+ threads += (ET_NET == SSLNetProcessor::ET_SSL) ? 0 : eventProcessor.n_threads_for_type[SSLNetProcessor::ET_SSL];
+ max_connections_per_thread_in = max_connections_in / threads;
+ max_connections_active_per_thread_in = max_connections_active_in / threads;
+ Debug("net_queue", "max_connections_per_thread_in updated to %d threads: %d", max_connections_per_thread_in, threads);
+ Debug("net_queue", "max_connections_active_per_thread_in updated to %d threads: %d", max_connections_active_per_thread_in,
+ threads);
+}
+
+void
+NetHandler::manage_keep_alive_queue()
+{
+ uint32_t total_connections_in = active_queue_size + keep_alive_queue_size;
+ ink_hrtime now = ink_get_hrtime();
+
+ Debug("net_queue", "max_connections_per_thread_in: %d total_connections_in: %d active_queue_size: %d keep_alive_queue_size: %d",
+ max_connections_per_thread_in, total_connections_in, active_queue_size, keep_alive_queue_size);
+
+ if (total_connections_in <= max_connections_per_thread_in) {
+ return;
+ }
+
+ // loop over the non-active connections and try to close them
+ UnixNetVConnection *vc_next = NULL;
+ int closed = 0;
+ int handle_event = 0;
+ int total_idle_time = 0;
+ int total_idle_count = 0;
+ for (UnixNetVConnection *vc = keep_alive_queue.head; vc != NULL; vc = vc_next) {
+ vc_next = vc->active_queue_link.next;
+ _close_vc(vc, now, handle_event, closed, total_idle_time, total_idle_count);
+
+ total_connections_in = active_queue_size + keep_alive_queue_size;
+ if (total_connections_in <= max_connections_per_thread_in) {
+ break;
+ }
+ }
+
+ if (total_idle_count > 0) {
+ Debug("net_queue", "max cons: %d active: %d idle: %d already closed: %d, close event: %d mean idle: %d\n",
+ max_connections_per_thread_in, total_connections_in, keep_alive_queue_size, closed, handle_event,
+ total_idle_time / total_idle_count);
+ }
+}
+
+void
+NetHandler::_close_vc(UnixNetVConnection *vc, ink_hrtime now, int &handle_event, int &closed, int &total_idle_time,
+ int &total_idle_count)
+{
+ if (vc->thread != this_ethread()) {
+ return;
+ }
+ MUTEX_TRY_LOCK(lock, vc->mutex, this_ethread());
+ if (!lock.is_locked()) {
+ return;
+ }
+ ink_hrtime diff = (now - (vc->next_inactivity_timeout_at - vc->inactivity_timeout_in)) / HRTIME_SECOND;
+ if (diff > 0) {
+ total_idle_time += diff;
+ ++total_idle_count;
+ NET_SUM_DYN_STAT(keep_alive_queue_timeout_total_stat, diff);
+ NET_INCREMENT_DYN_STAT(keep_alive_queue_timeout_count_stat);
+ }
+ Debug("net_queue", "closing connection NetVC=%p idle: %u now: %" PRId64 " at: %" PRId64 " in: %" PRId64 " diff: %" PRId64, vc,
+ keep_alive_queue_size, ink_hrtime_to_sec(now), ink_hrtime_to_sec(vc->next_inactivity_timeout_at),
+ ink_hrtime_to_sec(vc->inactivity_timeout_in), diff);
+ if (vc->closed) {
+ close_UnixNetVConnection(vc, this_ethread());
+ ++closed;
+ } else {
+ vc->next_inactivity_timeout_at = now;
+ keep_alive_queue.head->handleEvent(EVENT_IMMEDIATE, NULL);
+ ++handle_event;
+ }
+}
+
+void
+NetHandler::add_to_keep_alive_queue(UnixNetVConnection *vc)
+{
+ Debug("net_queue", "NetVC: %p", vc);
+
+ if (keep_alive_queue.in(vc)) {
+ // already in the keep-alive queue, move the head
+ keep_alive_queue.remove(vc);
+ } else {
+ // in the active queue or no queue, new to this queue
+ remove_from_active_queue(vc);
+ ++keep_alive_queue_size;
+ }
+ keep_alive_queue.enqueue(vc);
+
+ // if keep-alive queue is over size then close connections
+ manage_keep_alive_queue();
+}
+
+void
+NetHandler::remove_from_keep_alive_queue(UnixNetVConnection *vc)
+{
+ Debug("net_queue", "NetVC: %p", vc);
+ if (keep_alive_queue.in(vc)) {
+ keep_alive_queue.remove(vc);
+ --keep_alive_queue_size;
+ }
+}
+
+bool
+NetHandler::add_to_active_queue(UnixNetVConnection *vc)
+{
+ Debug("net_queue", "NetVC: %p", vc);
+ Debug("net_queue", "max_connections_per_thread_in: %d active_queue_size: %d keep_alive_queue_size: %d",
+ max_connections_per_thread_in, active_queue_size, keep_alive_queue_size);
+
+ // if active queue is over size then close inactive connections
+ if (manage_active_queue() == false) {
+ // there is no room left in the queue
+ return false;
+ }
+
+ if (active_queue.in(vc)) {
+ // already in the active queue, move the head
+ active_queue.remove(vc);
+ } else {
+ // in the keep-alive queue or no queue, new to this queue
+ remove_from_keep_alive_queue(vc);
+ ++active_queue_size;
+ }
+ active_queue.enqueue(vc);
+
+ return true;
+}
+
+void
+NetHandler::remove_from_active_queue(UnixNetVConnection *vc)
+{
+ Debug("net_queue", "NetVC: %p", vc);
+ if (active_queue.in(vc)) {
+ active_queue.remove(vc);
+ --active_queue_size;
+ }
+}
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/974e8e3a/iocore/net/UnixNetVConnection.cc
----------------------------------------------------------------------
diff --git a/iocore/net/UnixNetVConnection.cc b/iocore/net/UnixNetVConnection.cc
index 1a3ccea..c4ec29c 100644
--- a/iocore/net/UnixNetVConnection.cc
+++ b/iocore/net/UnixNetVConnection.cc
@@ -103,14 +103,16 @@ close_UnixNetVConnection(UnixNetVConnection *vc, EThread *t)
vc->inactivity_timeout->cancel_action(vc);
vc->inactivity_timeout = NULL;
}
-#else
- vc->next_inactivity_timeout_at = 0;
-#endif
- vc->inactivity_timeout_in = 0;
if (vc->active_timeout) {
vc->active_timeout->cancel_action(vc);
vc->active_timeout = NULL;
}
+#else
+ vc->next_inactivity_timeout_at = 0;
+ vc->next_activity_timeout_at = 0;
+#endif
+ vc->inactivity_timeout_in = 0;
+
vc->active_timeout_in = 0;
nh->open_list.remove(vc);
nh->cop_list.remove(vc);
@@ -124,7 +126,8 @@ close_UnixNetVConnection(UnixNetVConnection *vc, EThread *t)
nh->write_enable_list.remove(vc);
vc->write.in_enabled_list = 0;
}
- vc->remove_from_keep_alive_lru();
+ vc->remove_from_keep_alive_queue();
+ vc->remove_from_active_queue();
vc->free(t);
}
@@ -842,11 +845,11 @@ UnixNetVConnection::reenable_re(VIO *vio)
UnixNetVConnection::UnixNetVConnection()
: closed(0), inactivity_timeout_in(0), active_timeout_in(0),
#ifdef INACTIVITY_TIMEOUT
- inactivity_timeout(NULL),
+ inactivity_timeout(NULL), active_timeout(NULL),
#else
- next_inactivity_timeout_at(0),
+ next_inactivity_timeout_at(0), next_activity_timeout_at(0),
#endif
- active_timeout(NULL), nh(NULL), id(0), flags(0), recursion(0), submit_time(0), oob_ptr(0), from_accept_thread(false)
+ nh(NULL), id(0), flags(0), recursion(0), submit_time(0), oob_ptr(0), from_accept_thread(false)
{
memset(&local_addr, 0, sizeof local_addr);
memset(&server_addr, 0, sizeof server_addr);
@@ -1059,7 +1062,7 @@ UnixNetVConnection::mainEvent(int event, Event *e)
if (!hlock.is_locked() || !rlock.is_locked() || !wlock.is_locked() ||
(read.vio.mutex.m_ptr && rlock.get_mutex() != read.vio.mutex.m_ptr) ||
(write.vio.mutex.m_ptr && wlock.get_mutex() != write.vio.mutex.m_ptr)) {
-#ifndef INACTIVITY_TIMEOUT
+#ifdef INACTIVITY_TIMEOUT
if (e == active_timeout)
#endif
e->schedule_in(HRTIME_MSECONDS(net_retry_delay));
@@ -1081,6 +1084,10 @@ UnixNetVConnection::mainEvent(int event, Event *e)
if (e == inactivity_timeout) {
signal_event = VC_EVENT_INACTIVITY_TIMEOUT;
signal_timeout = &inactivity_timeout;
+ } else if {
+ ink_assert(e == active_timeout);
+ signal_event = VC_EVENT_ACTIVE_TIMEOUT;
+ signal_timeout = &active_timeout;
}
#else
if (event == EVENT_IMMEDIATE) {
@@ -1093,11 +1100,7 @@ UnixNetVConnection::mainEvent(int event, Event *e)
signal_timeout_at = &next_inactivity_timeout_at;
}
#endif
- else {
- ink_assert(e == active_timeout);
- signal_event = VC_EVENT_ACTIVE_TIMEOUT;
- signal_timeout = &active_timeout;
- }
+
*signal_timeout = 0;
*signal_timeout_at = 0;
writer_cont = write.vio._cont;
@@ -1231,7 +1234,9 @@ UnixNetVConnection::free(EThread *t)
ink_assert(!write.ready_link.prev && !write.ready_link.next);
ink_assert(!write.enable_link.next);
ink_assert(!link.next && !link.prev);
+#ifdef INACTIVITY_TIMEOUT
ink_assert(!active_timeout);
+#endif
ink_assert(con.fd == NO_FD);
ink_assert(t == this_ethread());
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/974e8e3a/mgmt/RecordsConfig.cc
----------------------------------------------------------------------
diff --git a/mgmt/RecordsConfig.cc b/mgmt/RecordsConfig.cc
index 06a8179..dc02ff0 100644
--- a/mgmt/RecordsConfig.cc
+++ b/mgmt/RecordsConfig.cc
@@ -462,7 +462,9 @@ static const RecordElement RecordsConfig[] =
,
{RECT_CONFIG, "proxy.config.http.attach_server_session_to_client", RECD_INT, "0", RECU_DYNAMIC, RR_NULL, RECC_INT, "[0-1]", RECA_NULL}
,
- {RECT_CONFIG, "proxy.config.net.max_connections_in", RECD_INT, "0", RECU_DYNAMIC, RR_NULL, RECC_STR, "^[0-9]+$", RECA_NULL}
+ {RECT_CONFIG, "proxy.config.net.max_connections_in", RECD_INT, "30000", RECU_DYNAMIC, RR_NULL, RECC_STR, "^[0-9]+$", RECA_NULL}
+ ,
+ {RECT_CONFIG, "proxy.config.net.max_connections_active_in", RECD_INT, "10000", RECU_DYNAMIC, RR_NULL, RECC_STR, "^[0-9]+$", RECA_NULL}
,
// ###########################
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/974e8e3a/proxy/PluginVC.cc
----------------------------------------------------------------------
diff --git a/proxy/PluginVC.cc b/proxy/PluginVC.cc
index 20e5888..66f2d1b 100644
--- a/proxy/PluginVC.cc
+++ b/proxy/PluginVC.cc
@@ -865,17 +865,24 @@ PluginVC::get_inactivity_timeout()
}
void
-PluginVC::add_to_keep_alive_lru()
+PluginVC::add_to_keep_alive_queue()
{
// do nothing
}
void
-PluginVC::remove_from_keep_alive_lru()
+PluginVC::remove_from_keep_alive_queue()
{
// do nothing
}
+bool
+PluginVC::add_to_active_queue()
+{
+ // do nothing
+ return false;
+}
+
SOCKET
PluginVC::get_socket()
{
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/974e8e3a/proxy/PluginVC.h
----------------------------------------------------------------------
diff --git a/proxy/PluginVC.h b/proxy/PluginVC.h
index ef2a5d5..3352ac9 100644
--- a/proxy/PluginVC.h
+++ b/proxy/PluginVC.h
@@ -93,8 +93,9 @@ public:
virtual void set_inactivity_timeout(ink_hrtime timeout_in);
virtual void cancel_active_timeout();
virtual void cancel_inactivity_timeout();
- virtual void add_to_keep_alive_lru();
- virtual void remove_from_keep_alive_lru();
+ virtual void add_to_keep_alive_queue();
+ virtual void remove_from_keep_alive_queue();
+ virtual bool add_to_active_queue();
virtual ink_hrtime get_active_timeout();
virtual ink_hrtime get_inactivity_timeout();
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/974e8e3a/proxy/http/HttpClientSession.cc
----------------------------------------------------------------------
diff --git a/proxy/http/HttpClientSession.cc b/proxy/http/HttpClientSession.cc
index 34afa84..fdf4d37 100644
--- a/proxy/http/HttpClientSession.cc
+++ b/proxy/http/HttpClientSession.cc
@@ -121,6 +121,13 @@ HttpClientSession::new_transaction()
ink_assert(current_reader == NULL);
PluginIdentity *pi = dynamic_cast<PluginIdentity *>(client_vc);
+ if (client_vc->add_to_active_queue() == false) {
+ // no room in the active queue close the connection
+ this->do_io_close();
+ return;
+ }
+
+
// Defensive programming, make sure nothing persists across
// connection re-use
half_close = false;
@@ -131,7 +138,6 @@ HttpClientSession::new_transaction()
transact_count++;
DebugHttpSsn("[%" PRId64 "] Starting transaction %d using sm [%" PRId64 "]", con_id, transact_count, current_reader->sm_id);
- client_vc->remove_from_keep_alive_lru();
current_reader->attach_client_session(this, sm_reader);
if (pi) {
// it's a plugin VC of some sort with identify information.
@@ -510,7 +516,7 @@ HttpClientSession::release(IOBufferReader *r)
SET_HANDLER(&HttpClientSession::state_keep_alive);
ka_vio = this->do_io_read(this, INT64_MAX, read_buffer);
ink_assert(slave_ka_vio != ka_vio);
- client_vc->add_to_keep_alive_lru();
+ client_vc->add_to_keep_alive_queue();
client_vc->set_inactivity_timeout(HRTIME_SECONDS(ka_in));
client_vc->cancel_active_timeout();
}
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/974e8e3a/proxy/spdy/SpdyCallbacks.cc
----------------------------------------------------------------------
diff --git a/proxy/spdy/SpdyCallbacks.cc b/proxy/spdy/SpdyCallbacks.cc
index e7b529a..45213a3 100644
--- a/proxy/spdy/SpdyCallbacks.cc
+++ b/proxy/spdy/SpdyCallbacks.cc
@@ -316,7 +316,7 @@ spdy_on_ctrl_recv_callback(spdylay_session *session, spdylay_frame_type type, sp
req->append_nv(frame->syn_stream.nv);
req->append_nv(no_keep_alive);
sm->req_map[stream_id] = req;
- sm->vc->add_to_keep_alive_lru();
+ sm->vc->add_to_active_queue();
spdy_process_syn_stream_frame(sm, req);
break;
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/974e8e3a/proxy/spdy/SpdyClientSession.cc
----------------------------------------------------------------------
diff --git a/proxy/spdy/SpdyClientSession.cc b/proxy/spdy/SpdyClientSession.cc
index 4317d9a..c6f1822 100644
--- a/proxy/spdy/SpdyClientSession.cc
+++ b/proxy/spdy/SpdyClientSession.cc
@@ -113,7 +113,7 @@ SpdyClientSession::init(NetVConnection *netvc)
start_time = TShrtime();
this->vc->set_inactivity_timeout(HRTIME_SECONDS(spdy_accept_no_activity_timeout));
- vc->add_to_keep_alive_lru();
+ vc->add_to_keep_alive_queue();
SET_HANDLER(&SpdyClientSession::state_session_start);
}
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/974e8e3a/proxy/spdy/SpdyClientSession.h
----------------------------------------------------------------------
diff --git a/proxy/spdy/SpdyClientSession.h b/proxy/spdy/SpdyClientSession.h
index 8c3750f..349942a 100644
--- a/proxy/spdy/SpdyClientSession.h
+++ b/proxy/spdy/SpdyClientSession.h
@@ -181,7 +181,7 @@ public:
this->req_map.erase(streamId);
}
if (req_map.empty() == true) {
- vc->add_to_keep_alive_lru();
+ vc->add_to_keep_alive_queue();
}
}