You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by zy...@apache.org on 2012/05/23 04:03:04 UTC
git commit: TS-1222 single tcp connection will limit the cluster
throughput
Updated Branches:
refs/heads/master bad6bf092 -> e67b0e403
TS-1222 single tcp connection will limit the cluster throughput
we have changed:
1, make multiple ClusterHandler per ClusterMachine
2, limit cluster request works only in ET_CLUSTER
after this patch, you will get far better performance in clustering,
we can expect it may used in 10G networks.
Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo
Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/e67b0e40
Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/e67b0e40
Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/e67b0e40
Branch: refs/heads/master
Commit: e67b0e4038d9144c02cf41d6bf06e42083295a52
Parents: bad6bf0
Author: Chen Bin <ku...@taobao.com>
Authored: Tue May 22 14:15:41 2012 +0800
Committer: Zhao Yongming <mi...@gmail.com>
Committed: Wed May 23 10:00:47 2012 +0800
----------------------------------------------------------------------
iocore/cache/P_CacheInternal.h | 13 ++---
iocore/cluster/ClusterAPI.cc | 8 ++--
iocore/cluster/ClusterCache.cc | 72 ++++++++++++++-----------
iocore/cluster/ClusterConfig.cc | 15 ++++--
iocore/cluster/ClusterHandler.cc | 48 +++++++++++------
iocore/cluster/ClusterHandlerBase.cc | 74 +++++++++++++++++--------
iocore/cluster/ClusterLoadMonitor.cc | 17 +++---
iocore/cluster/ClusterMachine.cc | 18 ++++++-
iocore/cluster/ClusterProcessor.cc | 39 +++++++-------
iocore/cluster/ClusterRPC.cc | 47 ++++++++---------
iocore/cluster/ClusterVConnection.cc | 17 +++----
iocore/cluster/P_ClusterCache.h | 39 +++++++------
iocore/cluster/P_ClusterCacheInternal.h | 1 +
iocore/cluster/P_ClusterHandler.h | 5 ++-
iocore/cluster/P_ClusterInline.h | 3 +-
iocore/cluster/P_ClusterLoadMonitor.h | 6 +-
iocore/cluster/P_ClusterMachine.h | 13 +++--
iocore/hostdb/HostDB.cc | 12 ++--
iocore/net/UnixNetAccept.cc | 2 +-
mgmt/RecordsConfig.cc | 2 +
proxy/http/HttpCacheSM.cc | 3 +-
21 files changed, 266 insertions(+), 188 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e67b0e40/iocore/cache/P_CacheInternal.h
----------------------------------------------------------------------
diff --git a/iocore/cache/P_CacheInternal.h b/iocore/cache/P_CacheInternal.h
index 69e298f..2dcee3e 100644
--- a/iocore/cache/P_CacheInternal.h
+++ b/iocore/cache/P_CacheInternal.h
@@ -1165,10 +1165,10 @@ CacheProcessor::open_write(Continuation *cont, CacheKey *key, CacheFragType frag
{
(void) expected_size;
#ifdef CLUSTER_CACHE
- ClusterMachine *m = cluster_machine_at_depth(cache_hash(*key));
-
- if (m && (cache_clustering_enabled > 0)) {
- return Cluster_write(cont, expected_size, (MIOBuffer *) 0, m,
+ if (cache_clustering_enabled > 0) {
+ ClusterMachine *m = cluster_machine_at_depth(cache_hash(*key));
+ if (m)
+ return Cluster_write(cont, expected_size, (MIOBuffer *) 0, m,
key, frag_type, options, pin_in_cache,
CACHE_OPEN_WRITE, key, (CacheURL *) 0,
(CacheHTTPHdr *) 0, (CacheHTTPInfo *) 0, hostname, host_len);
@@ -1278,10 +1278,9 @@ CacheProcessor::open_read_internal(int opcode,
url_md5 = *key;
}
ClusterMachine *m = cluster_machine_at_depth(cache_hash(url_md5));
- ClusterMachine *owner_machine = m ? m : this_cluster_machine();
- if (owner_machine != this_cluster_machine()) {
- return Cluster_read(owner_machine, opcode, cont, buf, url,
+ if (m) {
+ return Cluster_read(m, opcode, cont, buf, url,
request, params, key, pin_in_cache, frag_type, hostname, host_len);
} else {
if ((opcode == CACHE_OPEN_READ_LONG)
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e67b0e40/iocore/cluster/ClusterAPI.cc
----------------------------------------------------------------------
diff --git a/iocore/cluster/ClusterAPI.cc b/iocore/cluster/ClusterAPI.cc
index 5e64719..7691e90 100644
--- a/iocore/cluster/ClusterAPI.cc
+++ b/iocore/cluster/ClusterAPI.cc
@@ -487,9 +487,9 @@ TSDeleteClusterRPCFunction(TSClusterRPCHandle_t * rpch)
* Cluster calls us here for each RPC API function.
*/
void
-default_api_ClusterFunction(ClusterMachine * m, void *data, int len)
+default_api_ClusterFunction(ClusterHandler *ch, void *data, int len)
{
- Debug("cluster_api", "default_api_ClusterFunction: [%u.%u.%u.%u] data %p len %d", DOT_SEPARATED(m->ip), data, len);
+ Debug("cluster_api", "default_api_ClusterFunction: [%u.%u.%u.%u] data %p len %d", DOT_SEPARATED(ch->machine->ip), data, len);
TSClusterRPCMsg_t *msg = (TSClusterRPCMsg_t *) data;
RPCHandle_t *rpch = (RPCHandle_t *) & msg->m_handle;
@@ -501,7 +501,7 @@ default_api_ClusterFunction(ClusterMachine * m, void *data, int len)
if (cluster_function < API_END_CLUSTER_FUNCTION && RPC_Functions[cluster_function]) {
int msg_data_len = len - SIZEOF_RPC_MSG_LESS_DATA;
- TSNodeHandle_t nh = IP_TO_NODE_HANDLE(m->ip);
+ TSNodeHandle_t nh = IP_TO_NODE_HANDLE(ch->machine->ip);
(*RPC_Functions[cluster_function]) (&nh, msg, msg_data_len);
} else {
clusterProcessor.free_remote_data((char *) data, len);
@@ -572,7 +572,7 @@ TSSendClusterRPC(TSNodeHandle_t * nh, TSClusterRPCMsg_t * msg)
int len = c->len - sizeof(OutgoingControl *);
ink_release_assert((size_t) len >= sizeof(TSClusterRPCMsg_t));
- clusterProcessor.invoke_remote(m, rpch->u.internal.cluster_function,
+ clusterProcessor.invoke_remote(m->pop_ClusterHandler(), rpch->u.internal.cluster_function,
msg, len, (CLUSTER_OPT_STEAL | CLUSTER_OPT_DATA_IS_OCONTROL));
Debug("cluster_api", "TSSendClusterRPC: msg %p dlen %d [%u.%u.%u.%u] sent", msg, len, DOT_SEPARATED(ipaddr.s_addr));
} else {
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e67b0e40/iocore/cluster/ClusterCache.cc
----------------------------------------------------------------------
diff --git a/iocore/cluster/ClusterCache.cc b/iocore/cluster/ClusterCache.cc
index 3b06f85..51f375e 100644
--- a/iocore/cluster/ClusterCache.cc
+++ b/iocore/cluster/ClusterCache.cc
@@ -379,6 +379,7 @@ CacheContinuation::do_op(Continuation * c, ClusterMachine * mp, void *args,
CacheContinuation *cc = 0;
Action *act = 0;
char *msg = 0;
+ ClusterHandler *ch = mp->pop_ClusterHandler();
/////////////////////////////////////////////////////////////////////
// Unconditionally map open read buffer interfaces to open read.
@@ -396,8 +397,12 @@ CacheContinuation::do_op(Continuation * c, ClusterMachine * mp, void *args,
break;
}
+ if (!ch)
+ goto no_send_exit;
+
if (c) {
cc = cacheContAllocator_alloc();
+ cc->ch = ch;
cc->target_machine = mp;
cc->request_opcode = opcode;
cc->mutex = c->mutex;
@@ -610,7 +615,7 @@ CacheContinuation::do_op(Continuation * c, ClusterMachine * mp, void *args,
#ifdef CACHE_MSG_TRACE
log_cache_op_sndmsg((c ? cc->seq_number : CACHE_NO_RESPONSE), 0, "do_op");
#endif
- clusterProcessor.invoke_remote(mp,
+ clusterProcessor.invoke_remote(ch,
op_needs_marshalled_coi(opcode) ? CACHE_OP_MALLOCED_CLUSTER_FUNCTION
: CACHE_OP_CLUSTER_FUNCTION, (char *) msg, data_len);
@@ -753,7 +758,7 @@ CacheContinuation::lookupOpenWriteVC()
msg.seq_number = seq_number;
msg.token = vc->token;
- cache_op_result_ClusterFunction(from, (void *) &msg, msglen);
+ cache_op_result_ClusterFunction(ch, (void *) &msg, msglen);
} else {
// Miss, establish local VC and send remote open_write request
@@ -898,7 +903,7 @@ CacheContinuation::localVCsetupEvent(int event, ClusterVConnection * vc)
if (event != CLUSTER_EVENT_OPEN_EXISTS) {
// Send request message
- clusterProcessor.invoke_remote(from,
+ clusterProcessor.invoke_remote(ch,
(op_needs_marshalled_coi(request_opcode) ?
CACHE_OP_MALLOCED_CLUSTER_FUNCTION :
CACHE_OP_CLUSTER_FUNCTION), (char *) getMsgBuffer(), getMsgBufferLen());
@@ -1057,7 +1062,7 @@ init_from_short_2(CacheContinuation * cont, CacheOpMsg_short_2 * msg, ClusterMac
}
void
-cache_op_ClusterFunction(ClusterMachine * from, void *data, int len)
+cache_op_ClusterFunction(ClusterHandler * ch, void *data, int len)
{
EThread *thread = this_ethread();
ProxyMutex *mutex = thread->mutex;
@@ -1084,6 +1089,7 @@ cache_op_ClusterFunction(ClusterMachine * from, void *data, int len)
c->request_opcode = opcode;
c->token.clear();
c->start_time = ink_get_hrtime();
+ c->ch = ch;
SET_CONTINUATION_HANDLER(c, (CacheContHandler)
& CacheContinuation::replyOpEvent);
@@ -1101,9 +1107,9 @@ cache_op_ClusterFunction(ClusterMachine * from, void *data, int len)
case CACHE_OPEN_READ:
{
CacheOpMsg_short *msg = unmarshal_CacheOpMsg_short(data, mh->NeedByteSwap());
- init_from_short(c, msg, from);
+ init_from_short(c, msg, ch->machine);
Debug("cache_msg",
- "cache_op-s op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, from);
+ "cache_op-s op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine);
//
// Establish the remote side of the ClusterVConnection
//
@@ -1157,9 +1163,9 @@ cache_op_ClusterFunction(ClusterMachine * from, void *data, int len)
int flen = CacheOpMsg_long::sizeof_fixedlen_msg();
CacheOpMsg_long *msg = unmarshal_CacheOpMsg_long(c->getMsgBuffer(), mh->NeedByteSwap());
- init_from_long(c, msg, from);
+ init_from_long(c, msg, ch->machine);
Debug("cache_msg",
- "cache_op-l op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, from);
+ "cache_op-l op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine);
#ifdef CACHE_MSG_TRACE
log_cache_op_msg(msg->seq_number, len, "cache_op_open_read_long");
#endif
@@ -1247,9 +1253,9 @@ cache_op_ClusterFunction(ClusterMachine * from, void *data, int len)
case CACHE_OPEN_WRITE:
{
CacheOpMsg_short *msg = unmarshal_CacheOpMsg_short(data, mh->NeedByteSwap());
- init_from_short(c, msg, from);
+ init_from_short(c, msg, ch->machine);
Debug("cache_msg",
- "cache_op-s op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, from);
+ "cache_op-s op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine);
#ifdef CACHE_MSG_TRACE
log_cache_op_msg(msg->seq_number, len, "cache_op_open_write");
#endif
@@ -1302,9 +1308,9 @@ cache_op_ClusterFunction(ClusterMachine * from, void *data, int len)
int flen = CacheOpMsg_long::sizeof_fixedlen_msg();
CacheOpMsg_long *msg = unmarshal_CacheOpMsg_long(c->getMsgBuffer(), mh->NeedByteSwap());
- init_from_long(c, msg, from);
+ init_from_long(c, msg, ch->machine);
Debug("cache_msg",
- "cache_op-l op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, from);
+ "cache_op-l op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine);
#ifdef CACHE_MSG_TRACE
log_cache_op_msg(msg->seq_number, len, "cache_op_open_write_long");
#endif
@@ -1373,9 +1379,9 @@ cache_op_ClusterFunction(ClusterMachine * from, void *data, int len)
case CACHE_REMOVE:
{
CacheOpMsg_short *msg = unmarshal_CacheOpMsg_short(data, mh->NeedByteSwap());
- init_from_short(c, msg, from);
+ init_from_short(c, msg, ch->machine);
Debug("cache_msg",
- "cache_op op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, from);
+ "cache_op op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine);
#ifdef CACHE_MSG_TRACE
log_cache_op_msg(msg->seq_number, len, "cache_op_remove");
#endif
@@ -1400,9 +1406,9 @@ cache_op_ClusterFunction(ClusterMachine * from, void *data, int len)
case CACHE_LINK:
{
CacheOpMsg_short_2 *msg = unmarshal_CacheOpMsg_short_2(data, mh->NeedByteSwap());
- init_from_short_2(c, msg, from);
+ init_from_short_2(c, msg, ch->machine);
Debug("cache_msg",
- "cache_op op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, from);
+ "cache_op op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine);
#ifdef CACHE_MSG_TRACE
log_cache_op_msg(msg->seq_number, len, "cache_op_link");
#endif
@@ -1427,9 +1433,9 @@ cache_op_ClusterFunction(ClusterMachine * from, void *data, int len)
case CACHE_DEREF:
{
CacheOpMsg_short *msg = unmarshal_CacheOpMsg_short(data, mh->NeedByteSwap());
- init_from_short(c, msg, from);
+ init_from_short(c, msg, ch->machine);
Debug("cache_msg",
- "cache_op op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, from);
+ "cache_op op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine);
#ifdef CACHE_MSG_TRACE
log_cache_op_msg(msg->seq_number, len, "cache_op_deref");
#endif
@@ -1459,9 +1465,9 @@ cache_op_ClusterFunction(ClusterMachine * from, void *data, int len)
}
void
-cache_op_malloc_ClusterFunction(ClusterMachine * from, void *data, int len)
+cache_op_malloc_ClusterFunction(ClusterHandler *ch, void *data, int len)
{
- cache_op_ClusterFunction(from, data, len);
+ cache_op_ClusterFunction(ch, data, len);
// We own the message data, free it back to the Cluster subsystem
clusterProcessor.free_remote_data((char *) data, len);
}
@@ -1779,7 +1785,7 @@ CacheContinuation::replyOpEvent(int event, VConnection * cvc)
// Transmit reply message and object data in same cluster message
Debug("cache_proto", "Sending reply/data seqno=%d buflen=%"PRId64,
seq_number, readahead_data ? bytes_IOBufferBlockList(readahead_data, 1) : 0);
- clusterProcessor.invoke_remote_data(from,
+ clusterProcessor.invoke_remote_data(ch,
CACHE_OP_RESULT_CLUSTER_FUNCTION,
(void *) msg, (flen + len),
readahead_data,
@@ -1787,7 +1793,7 @@ CacheContinuation::replyOpEvent(int event, VConnection * cvc)
&CacheContinuation::disposeOfDataBuffer, (void *) this, CLUSTER_OPT_STEAL);
} else {
Debug("cache_proto", "Sending reply seqno=%d, (this=%p)", seq_number, this);
- clusterProcessor.invoke_remote(from, CACHE_OP_RESULT_CLUSTER_FUNCTION,
+ clusterProcessor.invoke_remote(ch, CACHE_OP_RESULT_CLUSTER_FUNCTION,
(void *) msg, (flen + len), CLUSTER_OPT_STEAL);
}
@@ -1966,9 +1972,8 @@ CacheContinuation::handleDisposeEvent(int event, CacheContinuation * cc)
// unmarshals the result and calls a continuation in the requesting thread.
/////////////////////////////////////////////////////////////////////////////
void
-cache_op_result_ClusterFunction(ClusterMachine * from, void *d, int l)
+cache_op_result_ClusterFunction(ClusterHandler *ch, void *d, int l)
{
- (void) from;
////////////////////////////////////////////////////////
// Note: we are running on the ET_CACHE_CONT_SM thread
////////////////////////////////////////////////////////
@@ -2036,7 +2041,7 @@ cache_op_result_ClusterFunction(ClusterMachine * from, void *d, int l)
}
// See if this response is still expected (expected case == yes)
- unsigned int hash = FOLDHASH(from->ip, msg->seq_number);
+ unsigned int hash = FOLDHASH(ch->machine->ip, msg->seq_number);
EThread *thread = this_ethread();
ProxyMutex *mutex = thread->mutex;
if (MUTEX_TAKE_TRY_LOCK(remoteCacheContQueueMutex[hash], thread)) {
@@ -2044,7 +2049,7 @@ cache_op_result_ClusterFunction(ClusterMachine * from, void *d, int l)
// Find it in pending list
CacheContinuation *c = find_cache_continuation(msg->seq_number,
- from->ip);
+ ch->machine->ip);
if (!c) {
// Reply took to long, response no longer expected.
MUTEX_UNTAKE_LOCK(remoteCacheContQueueMutex[hash], thread);
@@ -2085,7 +2090,7 @@ cache_op_result_ClusterFunction(ClusterMachine * from, void *d, int l)
CacheContinuation * c = CacheContinuation::cacheContAllocator_alloc();
c->mutex = new_ProxyMutex();
c->seq_number = msg->seq_number;
- c->target_ip = from->ip;
+ c->target_ip = ch->machine->ip;
SET_CONTINUATION_HANDLER(c, (CacheContHandler)
& CacheContinuation::handleReplyEvent);
c->start_time = ink_get_hrtime();
@@ -2532,6 +2537,9 @@ CacheContinuation::do_remote_lookup(Continuation * cont, CacheKey * key,
if (!m)
return (Action *) 0;
+ ClusterHandler *ch = m->pop_ClusterHandler();
+ if (!ch)
+ return (Action *) 0;
// If we do not have a continuation, build one
@@ -2541,6 +2549,7 @@ CacheContinuation::do_remote_lookup(Continuation * cont, CacheKey * key,
c->probe_depth = probe_depth;
memcpy(c->past_probes, past_probes, sizeof(past_probes));
}
+ c->ch = ch;
// Save hostname data in case we need to do a local lookup.
if (hostname && hostname_len) {
// Alloc buffer, copy hostname data and attach to continuation
@@ -2601,7 +2610,7 @@ CacheContinuation::do_remote_lookup(Continuation * cont, CacheKey * key,
#ifdef CACHE_MSG_TRACE
log_cache_op_sndmsg(msg.seq_number, 0, "cache_lookup");
#endif
- clusterProcessor.invoke_remote(m, CACHE_LOOKUP_CLUSTER_FUNCTION, data, len);
+ clusterProcessor.invoke_remote(c->ch, CACHE_LOOKUP_CLUSTER_FUNCTION, data, len);
return &c->action;
}
@@ -2613,9 +2622,8 @@ CacheContinuation::do_remote_lookup(Continuation * cont, CacheKey * key,
// continuation set to CacheContinuation::replyLookupEvent()
////////////////////////////////////////////////////////////////////////////
void
-cache_lookup_ClusterFunction(ClusterMachine * from, void *data, int len)
+cache_lookup_ClusterFunction(ClusterHandler *ch, void *data, int len)
{
- (void) from;
(void) len;
EThread *thread = this_ethread();
ProxyMutex *mutex = thread->mutex;
@@ -2642,7 +2650,7 @@ cache_lookup_ClusterFunction(ClusterMachine * from, void *data, int len)
MUTEX_TRY_LOCK(lock, c->mutex, this_ethread());
c->no_reply_message = (msg->seq_number == CACHE_NO_RESPONSE);
c->seq_number = msg->seq_number;
- c->from = from;
+ c->from = ch->machine;
c->url_md5 = msg->url_md5;
SET_CONTINUATION_HANDLER(c, (CacheContHandler)
& CacheContinuation::replyLookupEvent);
@@ -2692,7 +2700,7 @@ CacheContinuation::replyLookupEvent(int event, void *d)
#ifdef CACHE_MSG_TRACE
log_cache_op_sndmsg(seq_number, event, "cache_result");
#endif
- clusterProcessor.invoke_remote(from, CACHE_OP_RESULT_CLUSTER_FUNCTION, msg, len);
+ clusterProcessor.invoke_remote(ch, CACHE_OP_RESULT_CLUSTER_FUNCTION, msg, len);
}
} else {
//////////////////////////////////////////////////////////////
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e67b0e40/iocore/cluster/ClusterConfig.cc
----------------------------------------------------------------------
diff --git a/iocore/cluster/ClusterConfig.cc b/iocore/cluster/ClusterConfig.cc
index 0120158..94a0c5f 100644
--- a/iocore/cluster/ClusterConfig.cc
+++ b/iocore/cluster/ClusterConfig.cc
@@ -162,15 +162,20 @@ make_cluster_connections(MachineList * l, MachineList * old)
// Connect to all new machines.
//
uint32_t ip = this_cluster_machine()->ip;
+ int num_connections = this_cluster_machine()->num_connections;
if (l) {
- for (int i = 0; i < l->n; i++)
+ for (int i = 0; i < l->n; i++) {
#ifdef LOCAL_CLUSTER_TEST_MODE
- if (ip < l->machine[i].ip || (ip == l->machine[i].ip && (cluster_port < l->machine[i].port)))
+ if (ip < l->machine[i].ip || (ip == l->machine[i].ip && (cluster_port < l->machine[i].port))) {
#else
- if (ip < l->machine[i].ip)
+ if (ip < l->machine[i].ip) {
#endif
- clusterProcessor.connect(l->machine[i].ip, l->machine[i].port);
+ for (int j = 0; j < num_connections; j++) {
+ clusterProcessor.connect(l->machine[i].ip, l->machine[i].port, j);
+ }
+ }
+ }
}
}
@@ -470,7 +475,7 @@ cluster_machine_at_depth(unsigned int hash, int *pprobe_depth, ClusterMachine **
continue;
}
- return m;
+ return (m != this_cluster_machine()) ? m : NULL;
}
return NULL;
}
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e67b0e40/iocore/cluster/ClusterHandler.cc
----------------------------------------------------------------------
diff --git a/iocore/cluster/ClusterHandler.cc b/iocore/cluster/ClusterHandler.cc
index 9260ac3..1fd2555 100644
--- a/iocore/cluster/ClusterHandler.cc
+++ b/iocore/cluster/ClusterHandler.cc
@@ -141,6 +141,9 @@ ClusterHandler::ClusterHandler()
hostname(NULL),
machine(NULL),
ifd(-1),
+ id(-1),
+ dead(true),
+ downing(false),
active(false),
on_stolen_thread(false),
n_channels(0),
@@ -226,12 +229,19 @@ ClusterHandler::ClusterHandler()
ClusterHandler::~ClusterHandler()
{
+ bool free_m = false;
if (net_vc) {
net_vc->do_io(VIO::CLOSE);
net_vc = 0;
}
- if (machine)
- free_ClusterMachine(machine);
+ if (machine) {
+ MUTEX_TAKE_LOCK(the_cluster_config_mutex, this_ethread());
+ if (++machine->free_connections >= machine->num_connections)
+ free_m = true;
+ MUTEX_UNTAKE_LOCK(the_cluster_config_mutex, this_ethread());
+ if (free_m)
+ free_ClusterMachine(machine);
+ }
machine = NULL;
ats_free(hostname);
hostname = NULL;
@@ -302,10 +312,10 @@ ClusterHandler::close_ClusterVConnection(ClusterVConnection * vc)
ink_assert(!vc->write_list_bytes);
ink_assert(!vc->write_bytes_in_transit);
- if (((!vc->remote_closed && !vc->have_all_data) || (vc->remote_closed == FORCE_CLOSE_ON_OPEN_CHANNEL)) && vc->machine) {
+ if (((!vc->remote_closed && !vc->have_all_data) || (vc->remote_closed == FORCE_CLOSE_ON_OPEN_CHANNEL)) && vc->ch) {
CloseMessage msg;
- int vers = CloseMessage::protoToVersion(vc->machine->msg_proto_major);
+ int vers = CloseMessage::protoToVersion(vc->ch->machine->msg_proto_major);
void *data;
int len;
@@ -323,7 +333,7 @@ ClusterHandler::close_ClusterVConnection(ClusterVConnection * vc)
//////////////////////////////////////////////////////////////
ink_release_assert(!"close_ClusterVConnection() bad msg version");
}
- clusterProcessor.invoke_remote(vc->machine, CLOSE_CHANNEL_CLUSTER_FUNCTION, data, len);
+ clusterProcessor.invoke_remote(vc->ch, CLOSE_CHANNEL_CLUSTER_FUNCTION, data, len);
}
ink_hrtime now = ink_get_hrtime();
CLUSTER_DECREMENT_DYN_STAT(CLUSTER_CONNECTIONS_OPEN_STAT);
@@ -858,7 +868,7 @@ ClusterHandler::process_set_data_msgs()
if ((cluster_function_index < (uint32_t) SIZE_clusterFunction)
&& (cluster_function_index == SET_CHANNEL_DATA_CLUSTER_FUNCTION)) {
- clusterFunction[SET_CHANNEL_DATA_CLUSTER_FUNCTION].pfn(machine, p + (2 * sizeof(uint32_t)), len - sizeof(uint32_t));
+ clusterFunction[SET_CHANNEL_DATA_CLUSTER_FUNCTION].pfn(this, p + (2 * sizeof(uint32_t)), len - sizeof(uint32_t));
// Mark message as processed.
*((uint32_t *) (p + sizeof(uint32_t))) = ~*((uint32_t *) (p + sizeof(uint32_t)));
p += (2 * sizeof(uint32_t)) + (len - sizeof(uint32_t));
@@ -892,7 +902,7 @@ ClusterHandler::process_set_data_msgs()
&& (cluster_function_index == SET_CHANNEL_DATA_CLUSTER_FUNCTION)) {
char *p = ic->data;
- clusterFunction[SET_CHANNEL_DATA_CLUSTER_FUNCTION].pfn(machine,
+ clusterFunction[SET_CHANNEL_DATA_CLUSTER_FUNCTION].pfn(this,
(void *) (p + sizeof(int32_t)), ic->len - sizeof(int32_t));
// Reverse swap since this will be processed again for deallocation.
@@ -953,7 +963,7 @@ ClusterHandler::process_small_control_msgs()
// Cluster function, can only be processed in ET_CLUSTER thread
//////////////////////////////////////////////////////////////////////
p += sizeof(uint32_t);
- clusterFunction[cluster_function_index].pfn(machine, p, len - sizeof(int32_t));
+ clusterFunction[cluster_function_index].pfn(this, p, len - sizeof(int32_t));
p += (len - sizeof(int32_t));
} else {
@@ -1011,7 +1021,7 @@ ClusterHandler::process_large_control_msgs()
} else if (clusterFunction[cluster_function_index].ClusterFunc) {
// Cluster message, process in ET_CLUSTER thread
- clusterFunction[cluster_function_index].pfn(machine, (void *) (ic->data + sizeof(int32_t)),
+ clusterFunction[cluster_function_index].pfn(this, (void *) (ic->data + sizeof(int32_t)),
ic->len - sizeof(int32_t));
// Deallocate memory
@@ -1201,7 +1211,7 @@ ClusterHandler::process_incoming_callouts(ProxyMutex * m)
// Invoke processing function
////////////////////////////////
ink_assert(!clusterFunction[cluster_function_index].ClusterFunc);
- clusterFunction[cluster_function_index].pfn(machine, p, len - sizeof(int32_t));
+ clusterFunction[cluster_function_index].pfn(this, p, len - sizeof(int32_t));
now = ink_get_hrtime();
CLUSTER_SUM_DYN_STAT(CLUSTER_CTRL_MSGS_RECV_TIME_STAT, now - ic->recognized_time);
} else {
@@ -1222,7 +1232,7 @@ ClusterHandler::process_incoming_callouts(ProxyMutex * m)
// Invoke processing function
////////////////////////////////
ink_assert(!clusterFunction[cluster_function_index].ClusterFunc);
- clusterFunction[cluster_function_index].pfn(machine, (void *) (ic->data + sizeof(int32_t)),
+ clusterFunction[cluster_function_index].pfn(this, (void *) (ic->data + sizeof(int32_t)),
ic->len - sizeof(int32_t));
now = ink_get_hrtime();
CLUSTER_SUM_DYN_STAT(CLUSTER_CTRL_MSGS_RECV_TIME_STAT, now - ic->recognized_time);
@@ -1732,7 +1742,7 @@ ClusterHandler::build_controlmsg_descriptors()
control_msgs_built++;
if (clusterFunction[*(int32_t *) c->data].post_pfn) {
- clusterFunction[*(int32_t *) c->data].post_pfn(machine, c->data + sizeof(int32_t), c->len);
+ clusterFunction[*(int32_t *) c->data].post_pfn(this, c->data + sizeof(int32_t), c->len);
}
continue;
}
@@ -1829,7 +1839,7 @@ ClusterHandler::build_controlmsg_descriptors()
control_msgs_built++;
if (clusterFunction[*(int32_t *) c->data].post_pfn) {
- clusterFunction[*(int32_t *) c->data].post_pfn(machine, c->data + sizeof(int32_t), c->len);
+ clusterFunction[*(int32_t *) c->data].post_pfn(this, c->data + sizeof(int32_t), c->len);
}
}
}
@@ -2466,6 +2476,12 @@ ClusterHandler::mainClusterEvent(int event, Event * e)
while (io_activity) {
io_activity = 0;
only_write_control_msgs = 0;
+
+ if (downing) {
+ machine_down();
+ break;
+ }
+
//////////////////////////
// Read Processing
//////////////////////////
@@ -2502,7 +2518,7 @@ ClusterHandler::mainClusterEvent(int event, Event * e)
}
#ifdef CLUSTER_IMMEDIATE_NETIO
- if (!machine->dead && ((event == EVENT_POLL) || (event == EVENT_INTERVAL))) {
+ if (!dead && ((event == EVENT_POLL) || (event == EVENT_INTERVAL))) {
if (res >= 0) {
build_poll(true);
}
@@ -2518,7 +2534,7 @@ ClusterHandler::process_read(ink_hrtime now)
#ifdef CLUSTER_STATS
_process_read_calls++;
#endif
- if (machine->dead) {
+ if (dead) {
// Node is down
return 0;
}
@@ -3011,7 +3027,7 @@ ClusterHandler::process_write(ink_hrtime now, bool only_write_control_msgs)
//
// periodic activities
//
- if (!on_stolen_thread && !cur_vcs && !machine->dead) {
+ if (!on_stolen_thread && !cur_vcs && !dead) {
//
// check if this machine is supposed to be in the cluster
//
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e67b0e40/iocore/cluster/ClusterHandlerBase.cc
----------------------------------------------------------------------
diff --git a/iocore/cluster/ClusterHandlerBase.cc b/iocore/cluster/ClusterHandlerBase.cc
index baa97af..c184e89 100644
--- a/iocore/cluster/ClusterHandlerBase.cc
+++ b/iocore/cluster/ClusterHandlerBase.cc
@@ -182,7 +182,7 @@ OutgoingControl::alloc()
}
OutgoingControl::OutgoingControl()
-:m(NULL), submit_time(0)
+:ch(NULL), submit_time(0)
{
}
@@ -195,7 +195,6 @@ OutgoingControl::startEvent(int event, Event * e)
//
(void) event;
(void) e;
- ClusterHandler *ch = m->clusterHandler;
// verify that the machine has not gone down
if (!ch || !ch->thread)
return EVENT_DONE;
@@ -741,7 +740,7 @@ ClusterHandler::machine_down()
{
char textbuf[sizeof("255.255.255.255:65535")];
- if (machine->dead) {
+ if (dead) {
return EVENT_DONE;
}
//
@@ -754,7 +753,7 @@ ClusterHandler::machine_down()
#ifdef LOCAL_CLUSTER_TEST_MODE
Note("machine down %u.%u.%u.%u:%d", DOT_SEPARATED(ip), port);
#else
- Note("machine down %u.%u.%u.%u", DOT_SEPARATED(ip));
+ Note("machine down %u.%u.%u.%u:%d", DOT_SEPARATED(ip), id);
#endif
#ifdef NON_MODULAR
machine_offline_APIcallout(ip);
@@ -771,7 +770,8 @@ ClusterHandler::machine_down()
MUTEX_TAKE_LOCK(the_cluster_config_mutex, this_ethread());
ClusterConfiguration *c = this_cluster()->current_configuration();
- if (c->find(ip, port)) {
+ machine->clusterHandlers[id] = NULL;
+ if ((--machine->now_connections == 0) && c->find(ip, port)) {
ClusterConfiguration *cc = configuration_remove_machine(c, machine);
CLUSTER_DECREMENT_DYN_STAT(CLUSTER_NODES_STAT);
this_cluster()->configurations.push(cc);
@@ -780,7 +780,7 @@ ClusterHandler::machine_down()
MachineList *cc = the_cluster_config();
if (cc && cc->find(ip, port) && connector) {
Debug(CL_NOTE, "cluster connect retry for %hhu.%hhu.%hhu.%hhu", DOT_SEPARATED(ip));
- clusterProcessor.connect(ip, port);
+ clusterProcessor.connect(ip, port, id);
}
return zombify(); // defer deletion of *this
}
@@ -793,8 +793,7 @@ ClusterHandler::zombify(Event * e)
// Node associated with *this is declared down, setup the event to cleanup
// and defer deletion of *this
//
- machine->clusterHandler = NULL;
- machine->dead = true;
+ dead = true;
if (cluster_periodic_event) {
cluster_periodic_event->cancel(this);
cluster_periodic_event = NULL;
@@ -910,6 +909,8 @@ ClusterHandler::startClusterEvent(int event, Event * e)
nodeClusteringVersion._port = cluster_port;
#endif
cluster_connect_state = ClusterHandler::CLCON_SEND_MSG_COMPLETE;
+ if (connector)
+ nodeClusteringVersion._id = id;
build_data_vector((char *) &nodeClusteringVersion, sizeof(nodeClusteringVersion), false);
if (!write.doIO()) {
// i/o not initiated, delay and retry
@@ -1019,29 +1020,57 @@ ClusterHandler::startClusterEvent(int event, Event * e)
break; // goto next state
}
- // include this node into the cluster configuration
- MUTEX_TAKE_LOCK(the_cluster_config_mutex, this_ethread());
- MachineList *cc = the_cluster_config();
#ifdef LOCAL_CLUSTER_TEST_MODE
port = clusteringVersion._port & 0xffff;
#endif
+ if (!connector)
+ id = clusteringVersion._id & 0xffff;
+
+ // include this node into the cluster configuration
+ MUTEX_TAKE_LOCK(the_cluster_config_mutex, this_ethread());
+ MachineList *cc = the_cluster_config();
if (cc && cc->find(ip, port)) {
ClusterConfiguration *c = this_cluster()->current_configuration();
- if (!c->find(ip, port)) {
+ ClusterMachine *m = c->find(ip, port);
+
+ if (!m) { // this first connection
ClusterConfiguration *cconf = configuration_add_machine(c, machine);
CLUSTER_INCREMENT_DYN_STAT(CLUSTER_NODES_STAT);
this_cluster()->configurations.push(cconf);
} else {
- // duplicate cluster connect, ignore
- failed = -2;
- Debug(CL_NOTE, "duplicate cluster connect %u.%u.%u.%u", DOT_SEPARATED(ip));
+ if (m->num_connections > machine->num_connections) {
+ // close old needlessness connection if new num_connections < old num_connections
+ for (int i = machine->num_connections; i < m->num_connections; i++) {
+ if (m->clusterHandlers[i])
+ m->clusterHandlers[i]->downing = true;
+ }
+ m->num_connections = machine->num_connections;
+ m->free_connections -= (m->num_connections - machine->num_connections);
+ // delete_this
+ failed = -2;
+ MUTEX_UNTAKE_LOCK(the_cluster_config_mutex, this_ethread());
+ goto failed;
+ } else {
+ m->num_connections = machine->num_connections;
+ // close new connection if old connections is exist
+ if (id >= m->num_connections || m->clusterHandlers[id]) {
+ failed = -2;
+ MUTEX_UNTAKE_LOCK(the_cluster_config_mutex, this_ethread());
+ goto failed;
+ }
+ machine = m;
+ }
}
+ machine->now_connections++;
+ machine->clusterHandlers[id] = this;
+ machine->dead = false;
+ dead = false;
} else {
Debug(CL_NOTE, "cluster connect aborted, machine %u.%u.%u.%u:%d not in cluster", DOT_SEPARATED(ip), port);
failed = -1;
}
MUTEX_UNTAKE_LOCK(the_cluster_config_mutex, this_ethread());
-
+failed:
if (failed) {
if (failed == -1) {
if (++configLookupFails <= CONFIG_LOOKUP_RETRIES) {
@@ -1054,7 +1083,6 @@ ClusterHandler::startClusterEvent(int event, Event * e)
}
this->needByteSwap = !clusteringVersion.NativeByteOrder();
- machine->clusterHandler = this;
machine->msg_proto_major = proto_major;
machine->msg_proto_minor = proto_minor;
#ifdef NON_MODULAR
@@ -1067,8 +1095,8 @@ ClusterHandler::startClusterEvent(int event, Event * e)
Note("machine up %hhu.%hhu.%hhu.%hhu:%d, protocol version=%d.%d",
DOT_SEPARATED(ip), port, clusteringVersion._major, clusteringVersion._minor);
#else
- Note("machine up %hhu.%hhu.%hhu.%hhu, protocol version=%d.%d",
- DOT_SEPARATED(ip), clusteringVersion._major, clusteringVersion._minor);
+ Note("machine up %hhu.%hhu.%hhu.%hhu:%d, protocol version=%d.%d",
+ DOT_SEPARATED(ip), id, clusteringVersion._major, clusteringVersion._minor);
#endif
thread = e->ethread;
read_vcs = NEW((new Queue<ClusterVConnectionBase, ClusterVConnectionBase::Link_read_link>[CLUSTER_BUCKETS]));
@@ -1103,7 +1131,7 @@ ClusterHandler::startClusterEvent(int event, Event * e)
// Start cluster interconnect load monitoring
if (!clm) {
- clm = new ClusterLoadMonitor(machine);
+ clm = new ClusterLoadMonitor(this);
clm->init();
}
return EVENT_DONE;
@@ -1114,10 +1142,8 @@ ClusterHandler::startClusterEvent(int event, Event * e)
{
if (connector) {
Debug(CL_NOTE, "cluster connect retry for %u.%u.%u.%u", DOT_SEPARATED(ip));
- ClusterConfiguration *cc = this_cluster()->current_configuration();
// check for duplicate cluster connect
- if (!cc->find(ip))
- clusterProcessor.connect(ip, port, true);
+ clusterProcessor.connect(ip, port, id, true);
}
cluster_connect_state = ClusterHandler::CLCON_DELETE_CONNECT;
break; // goto next state
@@ -1243,7 +1269,7 @@ ClusterHandler::protoZombieEvent(int event, Event * e)
vc = channels[i];
if (VALID_CHANNEL(vc)) {
if (vc->closed) {
- vc->machine = 0;
+ vc->ch = 0;
vc->write_list = 0;
vc->write_list_tail = 0;
vc->write_list_bytes = 0;
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e67b0e40/iocore/cluster/ClusterLoadMonitor.cc
----------------------------------------------------------------------
diff --git a/iocore/cluster/ClusterLoadMonitor.cc b/iocore/cluster/ClusterLoadMonitor.cc
index 848307d..7da7b21 100644
--- a/iocore/cluster/ClusterLoadMonitor.cc
+++ b/iocore/cluster/ClusterLoadMonitor.cc
@@ -49,12 +49,12 @@ int
int
ClusterLoadMonitor::cf_cluster_load_exceed_duration;
-ClusterLoadMonitor::ClusterLoadMonitor(ClusterMachine * m)
-:Continuation(0), machine(m), ping_history_buf_head(0),
+ClusterLoadMonitor::ClusterLoadMonitor(ClusterHandler * ch)
+:Continuation(0), ch(ch), ping_history_buf_head(0),
periodic_action(0), cluster_overloaded(0), cancel_periodic(0),
cluster_load_msg_sequence_number(0), cluster_load_msg_start_sequence_number(0)
{
- mutex = this->machine->clusterHandler->mutex;
+ mutex = this->ch->mutex;
SET_HANDLER(&ClusterLoadMonitor::cluster_load_periodic);
ping_message_send_msec_interval = cf_ping_message_send_msec_interval ? cf_ping_message_send_msec_interval : 100;
@@ -220,20 +220,20 @@ ClusterLoadMonitor::compute_cluster_load()
}
Debug("cluster_monitor",
"[%u.%u.%u.%u] overload=%d, clear=%d, exceed=%d, latency=%d",
- DOT_SEPARATED(this->machine->ip), cluster_overloaded, threshold_clear, threshold_exceeded, n_bucket);
+ DOT_SEPARATED(this->ch->machine->ip), cluster_overloaded, threshold_clear, threshold_exceeded, n_bucket);
}
void
ClusterLoadMonitor::note_ping_response_time(ink_hrtime response_time, int sequence_number)
{
#ifdef CLUSTER_TOMCAT
- ProxyMutex *mutex = this->machine->clusterHandler->mutex; // hack for stats
+ ProxyMutex *mutex = this->ch->mutex; // hack for stats
#endif
CLUSTER_SUM_DYN_STAT(CLUSTER_PING_TIME_STAT, response_time);
int bucket = (int)
(response_time / HRTIME_MSECONDS(msecs_per_ping_response_bucket));
- Debug("cluster_monitor_ping", "[%u.%u.%u.%u] ping: %d %d", DOT_SEPARATED(this->machine->ip), bucket, sequence_number);
+ Debug("cluster_monitor_ping", "[%u.%u.%u.%u] ping: %d %d", DOT_SEPARATED(this->ch->machine->ip), bucket, sequence_number);
if (bucket >= num_ping_response_buckets)
bucket = num_ping_response_buckets - 1;
@@ -255,12 +255,11 @@ ClusterLoadMonitor::recv_cluster_load_msg(cluster_load_ping_msg * m)
}
void
-ClusterLoadMonitor::cluster_load_ping_rethandler(ClusterMachine * from, void *data, int len)
+ClusterLoadMonitor::cluster_load_ping_rethandler(ClusterHandler *ch, void *data, int len)
{
// Global cluster load ping message return handler which
// dispatches the result to the class specific handler.
- ClusterHandler *ch = from->clusterHandler;
if (ch) {
if (len == sizeof(struct cluster_load_ping_msg)) {
struct cluster_load_ping_msg m;
@@ -283,7 +282,7 @@ ClusterLoadMonitor::send_cluster_load_msg(ink_hrtime current_time)
m.sequence_number = cluster_load_msg_sequence_number++;
m.send_time = current_time;
- cluster_ping(machine, cluster_load_ping_rethandler, (void *) &m, sizeof(m));
+ cluster_ping(ch, cluster_load_ping_rethandler, (void *) &m, sizeof(m));
}
int
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e67b0e40/iocore/cluster/ClusterMachine.cc
----------------------------------------------------------------------
diff --git a/iocore/cluster/ClusterMachine.cc b/iocore/cluster/ClusterMachine.cc
index 31fab65..cd056e2 100644
--- a/iocore/cluster/ClusterMachine.cc
+++ b/iocore/cluster/ClusterMachine.cc
@@ -68,9 +68,13 @@ dead(false),
hostname(ahostname),
ip(aip),
cluster_port(aport),
+num_connections(0),
+now_connections(0),
+free_connections(0),
+rr_count(0),
msg_proto_major(0),
msg_proto_minor(0),
-clusterHandler(0)
+clusterHandlers(0)
{
EThread *thread = this_ethread();
ProxyMutex *mutex = thread->mutex;
@@ -141,11 +145,23 @@ clusterHandler(0)
hostname_len = strlen(hostname);
else
hostname_len = 0;
+
+ IOCORE_ReadConfigInteger(num_connections, "proxy.config.cluster.num_of_cluster_connections");
+ clusterHandlers = (ClusterHandler **)ats_calloc(num_connections, sizeof(ClusterHandler *));
+}
+
+ClusterHandler *ClusterMachine::pop_ClusterHandler(int no_rr)
+{
+ int64_t now = rr_count;
+ if (no_rr == 0)
+ ink_atomic_increment64(&rr_count, 1);
+ return this->clusterHandlers[now % this->num_connections];
}
ClusterMachine::~ClusterMachine()
{
ats_free(hostname);
+ ats_free(clusterHandlers);
}
#ifndef INK_NO_CLUSTER
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e67b0e40/iocore/cluster/ClusterProcessor.cc
----------------------------------------------------------------------
diff --git a/iocore/cluster/ClusterProcessor.cc b/iocore/cluster/ClusterProcessor.cc
index 6f20f5e..aa2b188 100644
--- a/iocore/cluster/ClusterProcessor.cc
+++ b/iocore/cluster/ClusterProcessor.cc
@@ -52,7 +52,7 @@ ClusterProcessor::~ClusterProcessor()
}
int
-ClusterProcessor::internal_invoke_remote(ClusterMachine * m, int cluster_fn,
+ClusterProcessor::internal_invoke_remote(ClusterHandler *ch, int cluster_fn,
void *data, int len, int options, void *cmsg)
{
EThread *thread = this_ethread();
@@ -67,7 +67,6 @@ ClusterProcessor::internal_invoke_remote(ClusterMachine * m, int cluster_fn,
bool malloced = (cluster_fn == CLUSTER_FUNCTION_MALLOCED);
OutgoingControl *c;
- ClusterHandler *ch = m->clusterHandler;
if (!ch || (!malloced && !((unsigned int) cluster_fn < (uint32_t) SIZE_clusterFunction))) {
// Invalid message or node is down, free message data
if (malloced) {
@@ -94,7 +93,6 @@ ClusterProcessor::internal_invoke_remote(ClusterMachine * m, int cluster_fn,
c = OutgoingControl::alloc();
}
CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CTRL_MSGS_SENT_STAT);
- c->m = m;
c->submit_time = ink_get_hrtime();
if (malloced) {
@@ -152,13 +150,13 @@ ClusterProcessor::internal_invoke_remote(ClusterMachine * m, int cluster_fn,
}
int
-ClusterProcessor::invoke_remote(ClusterMachine * m, int cluster_fn, void *data, int len, int options)
+ClusterProcessor::invoke_remote(ClusterHandler *ch, int cluster_fn, void *data, int len, int options)
{
- return internal_invoke_remote(m, cluster_fn, data, len, options, (void *) NULL);
+ return internal_invoke_remote(ch, cluster_fn, data, len, options, (void *) NULL);
}
int
-ClusterProcessor::invoke_remote_data(ClusterMachine * m, int cluster_fn,
+ClusterProcessor::invoke_remote_data(ClusterHandler *ch, int cluster_fn,
void *data, int data_len,
IOBufferBlock * buf,
int dest_channel, ClusterVCToken * token,
@@ -166,7 +164,7 @@ ClusterProcessor::invoke_remote_data(ClusterMachine * m, int cluster_fn,
{
if (!buf) {
// No buffer data, translate this into a invoke_remote() request
- return internal_invoke_remote(m, cluster_fn, data, data_len, options, (void *) NULL);
+ return internal_invoke_remote(ch, cluster_fn, data, data_len, options, (void *) NULL);
}
ink_assert(data);
ink_assert(data_len);
@@ -197,7 +195,7 @@ ClusterProcessor::invoke_remote_data(ClusterMachine * m, int cluster_fn,
*(int32_t *) chdr->data = -1; // always -1 for compound message
memcpy(chdr->data + sizeof(int32_t), (char *) &mh, sizeof(mh));
- return internal_invoke_remote(m, cluster_fn, data, data_len, options, (void *) chdr);
+ return internal_invoke_remote(ch, cluster_fn, data, data_len, options, (void *) chdr);
}
void
@@ -239,7 +237,7 @@ ClusterProcessor::open_local(Continuation * cont, ClusterMachine * m, ClusterVCT
bool immediate = ((options & CLUSTER_OPT_IMMEDIATE) ? true : false);
bool allow_immediate = ((options & CLUSTER_OPT_ALLOW_IMMEDIATE) ? true : false);
- ClusterHandler *ch = m->clusterHandler;
+ ClusterHandler *ch = ((CacheContinuation *)cont)->ch;
if (!ch)
return NULL;
EThread *t = ch->thread;
@@ -252,8 +250,9 @@ ClusterProcessor::open_local(Continuation * cont, ClusterMachine * m, ClusterVCT
vc->new_connect_read = (options & CLUSTER_OPT_CONN_READ ? 1 : 0);
vc->start_time = ink_get_hrtime();
vc->last_activity_time = vc->start_time;
- vc->machine = m;
+ vc->ch = ch;
vc->token.alloc();
+ vc->token.ch_id = ch->id;
token = vc->token;
#ifdef CLUSTER_THREAD_STEALING
CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CONNECTIONS_OPENNED_STAT);
@@ -306,7 +305,9 @@ ClusterProcessor::connect_local(Continuation * cont, ClusterVCToken * token, int
#endif
if (!m)
return NULL;
- ClusterHandler *ch = m->clusterHandler;
+ if (token->ch_id >= (uint32_t)m->num_connections)
+ return NULL;
+ ClusterHandler *ch = m->clusterHandlers[token->ch_id];
if (!ch)
return NULL;
EThread *t = ch->thread;
@@ -319,7 +320,7 @@ ClusterProcessor::connect_local(Continuation * cont, ClusterVCToken * token, int
vc->new_connect_read = (options & CLUSTER_OPT_CONN_READ ? 1 : 0);
vc->start_time = ink_get_hrtime();
vc->last_activity_time = vc->start_time;
- vc->machine = m;
+ vc->ch = ch;
vc->token = *token;
vc->channel = channel;
#ifdef CLUSTER_THREAD_STEALING
@@ -354,11 +355,7 @@ ClusterProcessor::connect_local(Continuation * cont, ClusterVCToken * token, int
bool ClusterProcessor::disable_remote_cluster_ops(ClusterMachine * m)
{
- if (!m)
- return false;
-
- ClusterHandler *
- ch = m->clusterHandler;
+ ClusterHandler *ch = m->pop_ClusterHandler(1);
if (ch) {
return ch->disable_remote_cluster_ops;
} else {
@@ -753,7 +750,7 @@ ClusterProcessor::start()
}
void
-ClusterProcessor::connect(char *hostname)
+ClusterProcessor::connect(char *hostname, int16_t id)
{
//
// Construct a cluster link to the given machine
@@ -762,11 +759,12 @@ ClusterProcessor::connect(char *hostname)
SET_CONTINUATION_HANDLER(ch, (ClusterContHandler) & ClusterHandler::connectClusterEvent);
ch->hostname = ats_strdup(hostname);
ch->connector = true;
+ ch->id = id;
eventProcessor.schedule_imm(ch, ET_CLUSTER);
}
void
-ClusterProcessor::connect(unsigned int ip, int port, bool delay)
+ClusterProcessor::connect(unsigned int ip, int port, int16_t id, bool delay)
{
//
// Construct a cluster link to the given machine
@@ -776,6 +774,7 @@ ClusterProcessor::connect(unsigned int ip, int port, bool delay)
ch->ip = ip;
ch->port = port;
ch->connector = true;
+ ch->id = id;
if (delay)
eventProcessor.schedule_in(ch, CLUSTER_MEMBER_DELAY, ET_CLUSTER);
else
@@ -814,7 +813,7 @@ ClusterProcessor::send_machine_list(ClusterMachine * m)
//////////////////////////////////////////////////////////////
ink_release_assert(!"send_machine_list() bad msg version");
}
- invoke_remote(m, MACHINE_LIST_CLUSTER_FUNCTION, data, len);
+ invoke_remote(m->pop_ClusterHandler(), MACHINE_LIST_CLUSTER_FUNCTION, data, len);
}
void
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e67b0e40/iocore/cluster/ClusterRPC.cc
----------------------------------------------------------------------
diff --git a/iocore/cluster/ClusterRPC.cc b/iocore/cluster/ClusterRPC.cc
index 58302af..9aa132a 100644
--- a/iocore/cluster/ClusterRPC.cc
+++ b/iocore/cluster/ClusterRPC.cc
@@ -34,27 +34,26 @@
/////////////////////////////////////////////////////////////////////////
void
-ping_ClusterFunction(ClusterMachine * from, void *data, int len)
+ping_ClusterFunction(ClusterHandler *ch, void *data, int len)
{
//
// Just return the data back
//
- clusterProcessor.invoke_remote(from, PING_REPLY_CLUSTER_FUNCTION, data, len);
+ clusterProcessor.invoke_remote(ch, PING_REPLY_CLUSTER_FUNCTION, data, len);
}
void
-ping_reply_ClusterFunction(ClusterMachine * from, void *data, int len)
+ping_reply_ClusterFunction(ClusterHandler *ch, void *data, int len)
{
//
// Pass back the data.
//
- (void) from;
PingMessage *msg = (PingMessage *) data;
- msg->fn(from, msg->data, (len - msg->sizeof_fixedlen_msg()));
+ msg->fn(ch, msg->data, (len - msg->sizeof_fixedlen_msg()));
}
void
-machine_list_ClusterFunction(ClusterMachine * from, void *data, int len)
+machine_list_ClusterFunction(ClusterHandler * from, void *data, int len)
{
(void) from;
ClusterMessageHeader *mh = (ClusterMessageHeader *) data;
@@ -82,14 +81,19 @@ machine_list_ClusterFunction(ClusterMachine * from, void *data, int len)
goto Lfound;
}
// not found, must be a new machine
- clusterProcessor.connect(m->ip[i]);
+ {
+ int num_connections = this_cluster_machine()->num_connections;
+ for (int k = 0; k < num_connections; k++) {
+ clusterProcessor.connect(m->ip[i], k);
+ }
+ }
Lfound:
;
}
}
void
-close_channel_ClusterFunction(ClusterMachine * from, void *data, int len)
+close_channel_ClusterFunction(ClusterHandler *ch, void *data, int len)
{
ClusterMessageHeader *mh = (ClusterMessageHeader *) data;
CloseMessage *m = (CloseMessage *) data;
@@ -106,7 +110,6 @@ close_channel_ClusterFunction(ClusterMachine * from, void *data, int len)
// Close the remote side of a VC connection (remote node is originator)
//
ink_assert(len >= (int) sizeof(CloseMessage));
- ClusterHandler *ch = from->clusterHandler;
if (!ch || !ch->channels)
return;
ClusterVConnection *vc = ch->channels[m->channel];
@@ -117,13 +120,13 @@ close_channel_ClusterFunction(ClusterMachine * from, void *data, int len)
}
void
-test_ClusterFunction(ClusterMachine * m, void *data, int len)
+test_ClusterFunction(ClusterHandler *ch, void *data, int len)
{
//
// Note: Only for testing.
//
if (ptest_ClusterFunction)
- ptest_ClusterFunction(m, data, len);
+ ptest_ClusterFunction(ch, data, len);
}
CacheVC *
@@ -157,7 +160,7 @@ ChannelToCacheWriteVC(ClusterHandler * ch, int channel, uint32_t channel_seqno,
}
void
-set_channel_data_ClusterFunction(ClusterMachine * from, void *tdata, int tlen)
+set_channel_data_ClusterFunction(ClusterHandler *ch, void *tdata, int tlen)
{
EThread *thread = this_ethread();
ProxyMutex *mutex = thread->mutex;
@@ -189,7 +192,6 @@ set_channel_data_ClusterFunction(ClusterMachine * from, void *tdata, int tlen)
ClusterVConnection *cvc;
CacheVC *cache_vc;
- ClusterHandler *ch = from->clusterHandler;
if (ch) {
cache_vc = ChannelToCacheWriteVC(ch, m->channel, m->sequence_number, &cvc);
@@ -228,7 +230,7 @@ set_channel_data_ClusterFunction(ClusterMachine * from, void *tdata, int tlen)
}
void
-post_setchan_send_ClusterFunction(ClusterMachine * to, void *data, int len)
+post_setchan_send_ClusterFunction(ClusterHandler *ch, void *data, int len)
{
NOWARN_UNUSED(len);
EThread *thread = this_ethread();
@@ -240,7 +242,6 @@ post_setchan_send_ClusterFunction(ClusterMachine * to, void *data, int len)
// initial open_write data after (n_set_data_msgs == 0).
SetChanDataMessage *m = (SetChanDataMessage *) data;
- ClusterHandler *ch = to->clusterHandler;
ClusterVConnection *cvc;
if (ch) {
@@ -256,7 +257,7 @@ post_setchan_send_ClusterFunction(ClusterMachine * to, void *data, int len)
}
void
-set_channel_pin_ClusterFunction(ClusterMachine * from, void *data, int len)
+set_channel_pin_ClusterFunction(ClusterHandler *ch, void *data, int len)
{
NOWARN_UNUSED(len);
// This isn't used. /leif
@@ -279,9 +280,8 @@ set_channel_pin_ClusterFunction(ClusterMachine * from, void *data, int len)
ClusterVConnection *cvc = NULL; // Just to make GCC happy
CacheVC *cache_vc;
- ClusterHandler *ch;
- if ((ch = from->clusterHandler) != 0) {
+ if (ch != 0) {
cache_vc = ChannelToCacheWriteVC(ch, m->channel, m->sequence_number, &cvc);
if (cache_vc) {
cache_vc->set_pin_in_cache(m->pin_time);
@@ -292,7 +292,7 @@ set_channel_pin_ClusterFunction(ClusterMachine * from, void *data, int len)
}
void
-post_setchan_pin_ClusterFunction(ClusterMachine * to, void *data, int len)
+post_setchan_pin_ClusterFunction(ClusterHandler *ch, void *data, int len)
{
NOWARN_UNUSED(len);
EThread *thread = this_ethread();
@@ -304,7 +304,6 @@ post_setchan_pin_ClusterFunction(ClusterMachine * to, void *data, int len)
// initial open_write data after (n_set_data_msgs == 0).
SetChanPinMessage *m = (SetChanPinMessage *) data;
- ClusterHandler *ch = to->clusterHandler;
ClusterVConnection *cvc;
if (ch) {
@@ -320,7 +319,7 @@ post_setchan_pin_ClusterFunction(ClusterMachine * to, void *data, int len)
}
void
-set_channel_priority_ClusterFunction(ClusterMachine * from, void *data, int len)
+set_channel_priority_ClusterFunction(ClusterHandler *ch, void *data, int len)
{
NOWARN_UNUSED(len);
// This isn't used.
@@ -342,9 +341,8 @@ set_channel_priority_ClusterFunction(ClusterMachine * from, void *data, int len)
ClusterVConnection *cvc = NULL; // Just to make GCC happy
CacheVC *cache_vc;
- ClusterHandler *ch;
- if ((ch = from->clusterHandler) != 0) {
+ if (ch != 0) {
cache_vc = ChannelToCacheWriteVC(ch, m->channel, m->sequence_number, &cvc);
if (cache_vc) {
cache_vc->set_disk_io_priority(m->disk_priority);
@@ -355,7 +353,7 @@ set_channel_priority_ClusterFunction(ClusterMachine * from, void *data, int len)
}
void
-post_setchan_priority_ClusterFunction(ClusterMachine * to, void *data, int len)
+post_setchan_priority_ClusterFunction(ClusterHandler *ch, void *data, int len)
{
NOWARN_UNUSED(len);
EThread *thread = this_ethread();
@@ -368,7 +366,6 @@ post_setchan_priority_ClusterFunction(ClusterMachine * to, void *data, int len)
// initial open_write data after (n_set_data_msgs == 0).
SetChanPriorityMessage *m = (SetChanPriorityMessage *) data;
- ClusterHandler *ch = to->clusterHandler;
ClusterVConnection *cvc;
if (ch) {
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e67b0e40/iocore/cluster/ClusterVConnection.cc
----------------------------------------------------------------------
diff --git a/iocore/cluster/ClusterVConnection.cc b/iocore/cluster/ClusterVConnection.cc
index bec5445..84dd223 100644
--- a/iocore/cluster/ClusterVConnection.cc
+++ b/iocore/cluster/ClusterVConnection.cc
@@ -186,7 +186,7 @@ ClusterVConnectionBase::reenable_re(VIO * vio)
}
ClusterVConnection::ClusterVConnection(int is_new_connect_read)
- : machine(NULL),
+ : ch(NULL),
new_connect_read(is_new_connect_read),
remote_free(0),
last_local_free(0),
@@ -306,7 +306,6 @@ ClusterVConnection::start(EThread * t)
///////////////////////////////////////////////////////////////////////////
int status;
- ClusterHandler *ch = NULL;
if (!channel) {
#ifdef CLUSTER_TOMCAT
Ptr<ProxyMutex> m = action_.mutex;
@@ -323,7 +322,6 @@ ClusterVConnection::start(EThread * t)
t->schedule_in(this, CLUSTER_CONNECT_RETRY);
return EVENT_DONE;
}
- ch = machine->clusterHandler;
if (!ch) {
if (action_.continuation) {
action_.continuation->handleEvent(CLUSTER_EVENT_OPEN_FAILED, (void *) -ECLUSTER_NO_MACHINE);
@@ -356,7 +354,6 @@ ClusterVConnection::start(EThread * t)
} else {
// Establish the remote side of the VC connection
- ch = machine->clusterHandler;
if ((status = ch->alloc_channel(this, channel)) < 0) {
Debug(CL_TRACE, "VC start alloc remote failed chan=%d VC=%p", channel, this);
clusterVCAllocator_free(this);
@@ -504,7 +501,7 @@ ClusterVConnection::set_http_info(CacheHTTPInfo * d)
// already done
ink_release_assert(this->read.vio.op == VIO::NONE); // should always be true
- int vers = SetChanDataMessage::protoToVersion(machine->msg_proto_major);
+ int vers = SetChanDataMessage::protoToVersion(ch->machine->msg_proto_major);
if (vers == SetChanDataMessage::SET_CHANNEL_DATA_MESSAGE_VERSION) {
flen = SetChanDataMessage::sizeof_fixedlen_msg();
} else {
@@ -537,7 +534,7 @@ ClusterVConnection::set_http_info(CacheHTTPInfo * d)
// note pending set_data() msgs on VC.
ink_atomic_increment(&n_set_data_msgs, 1);
- clusterProcessor.invoke_remote(machine, SET_CHANNEL_DATA_CLUSTER_FUNCTION, data, flen + len);
+ clusterProcessor.invoke_remote(ch, SET_CHANNEL_DATA_CLUSTER_FUNCTION, data, flen + len);
}
bool ClusterVConnection::set_pin_in_cache(time_t t)
@@ -554,7 +551,7 @@ bool ClusterVConnection::set_pin_in_cache(time_t t)
ink_release_assert(this->read.vio.op == VIO::NONE); // should always be true
time_pin = t;
- int vers = SetChanPinMessage::protoToVersion(machine->msg_proto_major);
+ int vers = SetChanPinMessage::protoToVersion(ch->machine->msg_proto_major);
if (vers == SetChanPinMessage::SET_CHANNEL_PIN_MESSAGE_VERSION) {
SetChanPinMessage::sizeof_fixedlen_msg();
@@ -571,7 +568,7 @@ bool ClusterVConnection::set_pin_in_cache(time_t t)
// note pending set_data() msgs on VC.
ink_atomic_increment(&n_set_data_msgs, 1);
- clusterProcessor.invoke_remote(machine, SET_CHANNEL_PIN_CLUSTER_FUNCTION, (char *) &msg, sizeof(msg));
+ clusterProcessor.invoke_remote(ch, SET_CHANNEL_PIN_CLUSTER_FUNCTION, (char *) &msg, sizeof(msg));
return true;
}
@@ -594,7 +591,7 @@ bool ClusterVConnection::set_disk_io_priority(int priority)
ink_release_assert(this->read.vio.op == VIO::NONE); // should always be true
disk_io_priority = priority;
- int vers = SetChanPriorityMessage::protoToVersion(machine->msg_proto_major);
+ int vers = SetChanPriorityMessage::protoToVersion(ch->machine->msg_proto_major);
if (vers == SetChanPriorityMessage::SET_CHANNEL_PRIORITY_MESSAGE_VERSION) {
SetChanPriorityMessage::sizeof_fixedlen_msg();
@@ -611,7 +608,7 @@ bool ClusterVConnection::set_disk_io_priority(int priority)
// note pending set_data() msgs on VC.
ink_atomic_increment(&n_set_data_msgs, 1);
- clusterProcessor.invoke_remote(machine, SET_CHANNEL_PRIORITY_CLUSTER_FUNCTION, (char *) &msg, sizeof(msg));
+ clusterProcessor.invoke_remote(ch, SET_CHANNEL_PRIORITY_CLUSTER_FUNCTION, (char *) &msg, sizeof(msg));
return true;
}
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e67b0e40/iocore/cluster/P_ClusterCache.h
----------------------------------------------------------------------
diff --git a/iocore/cluster/P_ClusterCache.h b/iocore/cluster/P_ClusterCache.h
index 4afc6ae..4fac3c0 100644
--- a/iocore/cluster/P_ClusterCache.h
+++ b/iocore/cluster/P_ClusterCache.h
@@ -63,7 +63,7 @@
// changes
//
#define CLUSTER_MAJOR_VERSION 3
-#define CLUSTER_MINOR_VERSION 0
+#define CLUSTER_MINOR_VERSION 2
// Lowest supported major/minor cluster version
#define MIN_CLUSTER_MAJOR_VERSION CLUSTER_MAJOR_VERSION
@@ -286,6 +286,7 @@ struct ClusterVCToken
// Marshal this data to send the token across the cluster
//
uint32_t ip_created;
+ uint32_t ch_id;
uint32_t sequence_number;
bool is_clear()
@@ -298,8 +299,8 @@ struct ClusterVCToken
sequence_number = 0;
}
- ClusterVCToken(unsigned int aip = 0, unsigned int aseq = 0)
-: ip_created(aip), sequence_number(aseq) {
+ ClusterVCToken(unsigned int aip = 0, unsigned int id = 0, unsigned int aseq = 0)
+: ip_created(aip), ch_id(id), sequence_number(aseq) {
}
//
// Private
@@ -308,6 +309,7 @@ struct ClusterVCToken
inline void SwapBytes()
{
+ ats_swap32(&ch_id);
ats_swap32(&sequence_number);
}
};
@@ -317,7 +319,7 @@ struct ClusterVCToken
// A pointer to a procedure which can be invoked accross the cluster.
// This must be registered.
//
-typedef void ClusterFunction(ClusterMachine * from, void *data, int len);
+typedef void ClusterFunction(ClusterHandler * ch, void *data, int len);
typedef ClusterFunction *ClusterFunctionPtr;
struct ClusterVConnectionBase;
@@ -515,7 +517,7 @@ struct ClusterVConnection: public ClusterVConnectionBase
virtual void do_io_close(int lerrno = -1);
- ClusterMachine *machine;
+ ClusterHandler *ch;
//
// Read Channel: (new_connect_read == true)
// - open_local() caller is reader
@@ -631,18 +633,18 @@ struct ClusterProcessor
// Options: CLUSTER_OPT_DELAY, CLUSTER_OPT_STEAL, CLUSTER_OPT_DATA_IS_OCONTROL
// Returns: 1 for immediate send, 0 for delayed, -1 for error
- int invoke_remote(ClusterMachine * m, int cluster_fn_index, void *data, int len, int options = CLUSTER_OPT_STEAL);
+ int invoke_remote(ClusterHandler *ch, int cluster_fn_index, void *data, int len, int options = CLUSTER_OPT_STEAL);
- int invoke_remote_data(ClusterMachine * m, int cluster_fn_index,
+ int invoke_remote_data(ClusterHandler *ch, int cluster_fn_index,
void *data, int data_len,
IOBufferBlock * buf,
int logical_channel, ClusterVCToken * token,
void (*bufdata_free) (void *), void *bufdata_free_arg, int options = CLUSTER_OPT_STEAL);
// Pass the data in as a malloc'ed block to be freed by callee
- int invoke_remote_malloced(ClusterMachine * m, ClusterRemoteDataHeader * data, int len /* including header */ )
+ int invoke_remote_malloced(ClusterHandler *ch, ClusterRemoteDataHeader * data, int len /* including header */ )
{
- return invoke_remote(m, CLUSTER_FUNCTION_MALLOCED, data, len);
+ return invoke_remote(ch, CLUSTER_FUNCTION_MALLOCED, data, len);
}
void free_remote_data(char *data, int len);
@@ -662,7 +664,7 @@ struct ClusterProcessor
#define CLUSTER_DELAYED_OPEN ((ClusterVConnection*)-1)
#define CLUSTER_NODE_DOWN ((ClusterVConnection*)-2)
- ClusterVConnection *open_local(Continuation * cont, ClusterMachine * m, ClusterVCToken & token, int options = 0);
+ ClusterVConnection *open_local(Continuation * cont, ClusterMachine * mp, ClusterVCToken & token, int options = 0);
// Get the other side of a remote VConnection which was previously
// allocated with open.
@@ -688,13 +690,13 @@ struct ClusterProcessor
ClusterAccept *accept_handler;
Cluster *this_cluster;
// Connect to a new cluster machine
- void connect(char *hostname);
- void connect(unsigned int ip, int port = 0, bool delay = false);
+ void connect(char *hostname, int16_t id = -1);
+ void connect(unsigned int ip, int port = 0, int16_t id = -1, bool delay = false);
// send the list of known machines to new machine
void send_machine_list(ClusterMachine * m);
void compute_cluster_mode();
// Internal invoke_remote interface
- int internal_invoke_remote(ClusterMachine * m, int cluster_fn, void *data, int len, int options, void *cmsg);
+ int internal_invoke_remote(ClusterHandler * m, int cluster_fn, void *data, int len, int options, void *cmsg);
};
inkcoreapi extern ClusterProcessor clusterProcessor;
@@ -996,11 +998,12 @@ struct ClusterHelloMessage
uint16_t _minor;
uint16_t _min_major;
uint16_t _min_minor;
+ int16_t _id;
#ifdef LOCAL_CLUSTER_TEST_MODE
int16_t _port;
- char _pad[116]; // pad out to 128 bytes
+ char _pad[114]; // pad out to 128 bytes
#else
- char _pad[118]; // pad out to 128 bytes
+ char _pad[116]; // pad out to 128 bytes
#endif
ClusterHelloMessage():_NativeByteOrder(1)
@@ -1068,7 +1071,7 @@ struct ClusterMessageHeader
//
// cluster_ping
//
-typedef void (*PingReturnFunction) (ClusterMachine *, void *data, int len);
+typedef void (*PingReturnFunction) (ClusterHandler *, void *data, int len);
struct PingMessage:public ClusterMessageHeader
{
@@ -1108,13 +1111,13 @@ struct PingMessage:public ClusterMessageHeader
};
inline void
-cluster_ping(ClusterMachine * m, PingReturnFunction fn, void *data, int len)
+cluster_ping(ClusterHandler *ch, PingReturnFunction fn, void *data, int len)
{
PingMessage *msg = (PingMessage *)alloca(PingMessage::sizeof_fixedlen_msg() + len);
msg->init();
msg->fn = fn;
memcpy(msg->data, data, len);
- clusterProcessor.invoke_remote(m, PING_CLUSTER_FUNCTION, (void *) msg, (msg->sizeof_fixedlen_msg() + len));
+ clusterProcessor.invoke_remote(ch, PING_CLUSTER_FUNCTION, (void *) msg, (msg->sizeof_fixedlen_msg() + len));
}
// filled with 0's
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e67b0e40/iocore/cluster/P_ClusterCacheInternal.h
----------------------------------------------------------------------
diff --git a/iocore/cluster/P_ClusterCacheInternal.h b/iocore/cluster/P_ClusterCacheInternal.h
index 3cef837..aea7beb 100644
--- a/iocore/cluster/P_ClusterCacheInternal.h
+++ b/iocore/cluster/P_ClusterCacheInternal.h
@@ -104,6 +104,7 @@ struct CacheContinuation:public Continuation
ClusterMachine *past_probes[CONFIGURATION_HISTORY_PROBE_DEPTH];
ink_hrtime start_time;
ClusterMachine *from;
+ ClusterHandler *ch;
VConnection *cache_vc;
bool cache_read;
int result; // return event code
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e67b0e40/iocore/cluster/P_ClusterHandler.h
----------------------------------------------------------------------
diff --git a/iocore/cluster/P_ClusterHandler.h b/iocore/cluster/P_ClusterHandler.h
index 35bb6b0..979de00 100644
--- a/iocore/cluster/P_ClusterHandler.h
+++ b/iocore/cluster/P_ClusterHandler.h
@@ -83,7 +83,7 @@ struct ClusterControl: public Continuation
struct OutgoingControl: public ClusterControl
{
- ClusterMachine *m;
+ ClusterHandler *ch;
ink_hrtime submit_time;
static OutgoingControl *alloc();
@@ -430,6 +430,9 @@ struct ClusterHandler:public ClusterHandlerBase
char *hostname;
ClusterMachine *machine;
int ifd;
+ int id;
+ bool dead;
+ bool downing;
int32_t active; // handler currently running
bool on_stolen_thread;
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e67b0e40/iocore/cluster/P_ClusterInline.h
----------------------------------------------------------------------
diff --git a/iocore/cluster/P_ClusterInline.h b/iocore/cluster/P_ClusterInline.h
index d62d54b..c653956 100644
--- a/iocore/cluster/P_ClusterInline.h
+++ b/iocore/cluster/P_ClusterInline.h
@@ -30,6 +30,7 @@
#define __P_CLUSTERINLINE_H__
#include "P_ClusterCacheInternal.h"
#include "P_CacheInternal.h"
+#include "P_ClusterHandler.h"
inline Action *
Cluster_lookup(Continuation * cont, CacheKey * key, CacheFragType frag_type, char *hostname, int host_len)
@@ -37,7 +38,7 @@ Cluster_lookup(Continuation * cont, CacheKey * key, CacheFragType frag_type, cha
// Try to send remote, if not possible, handle locally
Action *retAct;
ClusterMachine *m = cluster_machine_at_depth(cache_hash(*key));
- if (!clusterProcessor.disable_remote_cluster_ops(m)) {
+ if (m && !clusterProcessor.disable_remote_cluster_ops(m)) {
CacheContinuation *cc = CacheContinuation::cacheContAllocator_alloc();
cc->action = cont;
cc->mutex = cont->mutex;
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e67b0e40/iocore/cluster/P_ClusterLoadMonitor.h
----------------------------------------------------------------------
diff --git a/iocore/cluster/P_ClusterLoadMonitor.h b/iocore/cluster/P_ClusterLoadMonitor.h
index 46f9806..89156bd 100644
--- a/iocore/cluster/P_ClusterLoadMonitor.h
+++ b/iocore/cluster/P_ClusterLoadMonitor.h
@@ -70,10 +70,10 @@ public:
}
};
- static void cluster_load_ping_rethandler(ClusterMachine *, void *, int);
+ static void cluster_load_ping_rethandler(ClusterHandler *, void *, int);
public:
- ClusterLoadMonitor(ClusterMachine * m);
+ ClusterLoadMonitor(ClusterHandler * ch);
void init();
~ClusterLoadMonitor();
void cancel_monitor();
@@ -101,7 +101,7 @@ private:
int cluster_load_exceed_duration;
// Class specific data
- ClusterMachine *machine;
+ ClusterHandler *ch;
int *ping_response_buckets;
ink_hrtime *ping_response_history_buf;
int ping_history_buf_head;
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e67b0e40/iocore/cluster/P_ClusterMachine.h
----------------------------------------------------------------------
diff --git a/iocore/cluster/P_ClusterMachine.h b/iocore/cluster/P_ClusterMachine.h
index a69227d..61aee3a 100644
--- a/iocore/cluster/P_ClusterMachine.h
+++ b/iocore/cluster/P_ClusterMachine.h
@@ -66,12 +66,17 @@ struct ClusterMachine: public Server
//
unsigned int ip;
int cluster_port;
+ int num_connections;
+ int now_connections;
+ int free_connections;
+ int64_t rr_count;
- Link<ClusterMachine> link;
+ Link<ClusterMachine> link;
// default for localhost
- ClusterMachine(char *hostname = NULL, unsigned int ip = 0, int acluster_port = 0);
- ~ClusterMachine();
+ ClusterMachine(char *hostname = NULL, unsigned int ip = 0, int acluster_port = 0);
+ ~ClusterMachine();
+ ClusterHandler *pop_ClusterHandler(int no_rr = 0);
// Cluster message protocol version
uint16_t msg_proto_major;
@@ -79,7 +84,7 @@ struct ClusterMachine: public Server
// Private data for ClusterProcessor
//
- ClusterHandler *clusterHandler;
+ ClusterHandler **clusterHandlers;
};
struct MachineListElement
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e67b0e40/iocore/hostdb/HostDB.cc
----------------------------------------------------------------------
diff --git a/iocore/hostdb/HostDB.cc b/iocore/hostdb/HostDB.cc
index 4841fba..4c17f5a 100644
--- a/iocore/hostdb/HostDB.cc
+++ b/iocore/hostdb/HostDB.cc
@@ -1577,7 +1577,7 @@ bool HostDBContinuation::do_get_response(Event * e)
// Send the message
//
- clusterProcessor.invoke_remote(m, GET_HOSTINFO_CLUSTER_FUNCTION, (char *) &msg, len);
+ clusterProcessor.invoke_remote(m->pop_ClusterHandler(), GET_HOSTINFO_CLUSTER_FUNCTION, (char *) &msg, len);
return true;
}
@@ -1650,7 +1650,7 @@ HostDBContinuation::do_put_response(ClusterMachine * m, HostDBInfo * r, Continua
HostDB_put_message msg;
int len = make_put_message(r, c, (char *) &msg, sizeof(HostDB_put_message));
- clusterProcessor.invoke_remote(m, PUT_HOSTINFO_CLUSTER_FUNCTION, (char *) &msg, len);
+ clusterProcessor.invoke_remote(m->pop_ClusterHandler(), PUT_HOSTINFO_CLUSTER_FUNCTION, (char *) &msg, len);
}
#endif // NON_MODULAR
@@ -1935,7 +1935,7 @@ HostDBContinuation::failed_cluster_request(Event * e)
void
-get_hostinfo_ClusterFunction(ClusterMachine * from, void *data, int len)
+get_hostinfo_ClusterFunction(ClusterHandler *ch, void *data, int len)
{
NOWARN_UNUSED(len);
void *pDS = 0;
@@ -1956,7 +1956,7 @@ get_hostinfo_ClusterFunction(ClusterMachine * from, void *data, int len)
HostDBContinuation *c = hostDBContAllocator.alloc();
SET_CONTINUATION_HANDLER(c, (HostDBContHandler) & HostDBContinuation::probeEvent);
- c->from = from;
+ c->from = ch->machine;
c->from_cont = msg->cont;
/* -----------------------------------------
@@ -1975,7 +1975,7 @@ get_hostinfo_ClusterFunction(ClusterMachine * from, void *data, int len)
void
-put_hostinfo_ClusterFunction(ClusterMachine * from, void *data, int len)
+put_hostinfo_ClusterFunction(ClusterHandler *ch, void *data, int len)
{
NOWARN_UNUSED(len);
HostDB_put_message *msg = (HostDB_put_message *) data;
@@ -1988,7 +1988,7 @@ put_hostinfo_ClusterFunction(ClusterMachine * from, void *data, int len)
c->missing = msg->missing;
c->round_robin = msg->round_robin;
c->ttl = msg->ttl;
- c->from = from;
+ c->from = ch->machine;
dnsProcessor.thread->schedule_imm(c);
}
#endif // NON_MODULAR
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e67b0e40/iocore/net/UnixNetAccept.cc
----------------------------------------------------------------------
diff --git a/iocore/net/UnixNetAccept.cc b/iocore/net/UnixNetAccept.cc
index 1ff6511..f2ff274 100644
--- a/iocore/net/UnixNetAccept.cc
+++ b/iocore/net/UnixNetAccept.cc
@@ -163,7 +163,7 @@ NetAccept::allocateGlobal()
// or ET_NET).
EventType NetAccept::getEtype()
{
- return (ET_NET);
+ return etype;
}
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e67b0e40/mgmt/RecordsConfig.cc
----------------------------------------------------------------------
diff --git a/mgmt/RecordsConfig.cc b/mgmt/RecordsConfig.cc
index d86d57c..d08f59a 100644
--- a/mgmt/RecordsConfig.cc
+++ b/mgmt/RecordsConfig.cc
@@ -858,6 +858,8 @@ RecordElement RecordsConfig[] = {
//##############################################################################
{RECT_CONFIG, "proxy.config.cluster.threads", RECD_INT, "1", RECU_RESTART_TS, RR_NULL, RECC_INT, "[0-512]", RECA_NULL}
,
+ {RECT_CONFIG, "proxy.config.cluster.num_of_cluster_connections", RECD_INT, "1", RECU_RESTART_TS, RR_NULL, RECC_INT, "[1-4096]", RECA_NULL}
+ ,
{RECT_CONFIG, "proxy.config.cluster.cluster_port", RECD_INT, "8086", RECU_RESTART_TS, RR_REQUIRED, RECC_NULL, NULL, RECA_NULL}
,
{RECT_CONFIG, "proxy.config.cluster.cluster_configuration", RECD_STRING, "cluster.config", RECU_NULL, RR_NULL, RECC_NULL, NULL, RECA_NULL}
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/e67b0e40/proxy/http/HttpCacheSM.cc
----------------------------------------------------------------------
diff --git a/proxy/http/HttpCacheSM.cc b/proxy/http/HttpCacheSM.cc
index f8e26a6..14c9f30 100644
--- a/proxy/http/HttpCacheSM.cc
+++ b/proxy/http/HttpCacheSM.cc
@@ -57,7 +57,8 @@ HttpCacheAction::cancel(Continuation * c)
ink_assert(this->cancelled == 0);
this->cancelled = 1;
- sm->pending_action->cancel();
+ if (sm->pending_action)
+ sm->pending_action->cancel();
}
HttpCacheSM::HttpCacheSM():