You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by we...@apache.org on 2013/12/04 04:39:06 UTC
[4/6] refine the codes of cluster
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/62504a9f/iocore/cluster/ClusterConfig.cc
----------------------------------------------------------------------
diff --git a/iocore/cluster/ClusterConfig.cc b/iocore/cluster/ClusterConfig.cc
index a541364..b969f44 100644
--- a/iocore/cluster/ClusterConfig.cc
+++ b/iocore/cluster/ClusterConfig.cc
@@ -28,6 +28,9 @@
****************************************************************************/
#include "P_Cluster.h"
+#include "machine.h"
+#include "connection.h"
+
// updated from the cluster port configuration variable
int cluster_port = DEFAULT_CLUSTER_PORT_NUMBER;
@@ -155,16 +158,30 @@ ClusterAccept::ClusterAcceptMachine(NetVConnection * NetVC)
}
static void
-make_cluster_connections(MachineList * l)
+make_cluster_connections(MachineList * l, MachineList * old)
{
//
// Connect to all new machines.
//
- uint32_t ip = this_cluster_machine()->ip;
- int num_connections = this_cluster_machine()->num_connections;
+ //uint32_t ip = this_cluster_machine()->ip;
+ //int num_connections = this_cluster_machine()->num_connections;
+ int i;
+ int k;
+ ClusterMachine *m;
if (l) {
- for (int i = 0; i < l->n; i++) {
+ for (i = 0; i < l->n; i++) {
+ struct in_addr in;
+ in.s_addr = l->machine[i].ip;
+ m = add_machine(l->machine[i].ip, l->machine[i].port);
+ if (m != NULL) {
+ machine_make_connections(m);
+ }
+
+ Debug(CL_NOTE, "do connect hostname: %u.%u.%u.%u:%d, %s, cluster_machine_count: %d\n",
+ DOT_SEPARATED(l->machine[i].ip), l->machine[i].port, inet_ntoa(in), cluster_machine_count);
+
+ /*
#ifdef LOCAL_CLUSTER_TEST_MODE
if (ip < l->machine[i].ip || (ip == l->machine[i].ip && (cluster_port < l->machine[i].port))) {
#else
@@ -175,6 +192,48 @@ make_cluster_connections(MachineList * l)
}
}
}
+ */
+ }
+ }
+
+ if (old == NULL) {
+ return;
+ }
+
+ //found down machines
+ if (l == NULL) {
+ for (i = 0; i < old->n; i++) {
+ struct in_addr in;
+ in.s_addr = old->machine[i].ip;
+ Debug(CL_NOTE, "stop connect hostname: %u.%u.%u.%u:%d, %s\n",
+ DOT_SEPARATED(old->machine[i].ip), old->machine[i].port, inet_ntoa(in));
+ m = get_machine(old->machine[i].ip, old->machine[i].port);
+ if (m != NULL) {
+ machine_stop_reconnect(m);
+ }
+ }
+ }
+ else {
+ for (i = 0; i < old->n; i++) {
+ for (k = 0; k < l->n; k++) {
+ if (l->machine[k].ip == old->machine[i].ip &&
+ l->machine[k].port == old->machine[i].port)
+ {
+ break;
+ }
+ }
+
+ if (k == l->n) { //not found, machine down
+ struct in_addr in;
+ in.s_addr = old->machine[i].ip;
+ Debug(CL_NOTE, "stop connect hostname: %u.%u.%u.%u:%d, %s\n",
+ DOT_SEPARATED(old->machine[i].ip), old->machine[i].port, inet_ntoa(in));
+ m = get_machine(old->machine[i].ip, old->machine[i].port);
+ if (m != NULL) {
+ machine_stop_reconnect(m);
+ }
+ }
+ }
}
}
@@ -201,7 +260,7 @@ machine_config_change(const char * /* name ATS_UNUSED */, RecDataT /* data_type
case CLUSTER_CONFIG:
old = cluster_config;
cluster_config = l;
- make_cluster_connections(l);
+ make_cluster_connections(l, old);
break;
}
#else
@@ -209,7 +268,7 @@ machine_config_change(const char * /* name ATS_UNUSED */, RecDataT /* data_type
old = cluster_config;
machines_config = l;
cluster_config = l;
- make_cluster_connections(l);
+ make_cluster_connections(l, old);
#endif
if (old)
free_MachineList(old);
@@ -291,8 +350,10 @@ configuration_add_machine(ClusterConfiguration * c, ClusterMachine * m)
// Build a new cluster configuration with the new machine.
// Machines are stored in ip sorted order.
//
+ /*
EThread *thread = this_ethread();
ProxyMutex *mutex = thread->mutex;
+ */
int i = 0;
ClusterConfiguration *cc = NEW(new ClusterConfiguration(*c));
@@ -319,7 +380,7 @@ configuration_add_machine(ClusterConfiguration * c, ClusterMachine * m)
build_cluster_hash_table(cc);
INK_MEMORY_BARRIER; // commit writes before freeing old hash table
- CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CONFIGURATION_CHANGES_STAT);
+ //CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CONFIGURATION_CHANGES_STAT);
free_configuration(c, cc);
return cc;
@@ -328,9 +389,6 @@ configuration_add_machine(ClusterConfiguration * c, ClusterMachine * m)
ClusterConfiguration *
configuration_remove_machine(ClusterConfiguration * c, ClusterMachine * m)
{
- EThread *thread = this_ethread();
- ProxyMutex *mutex = thread->mutex;
-
//
// Build a new cluster configuration without a machine
//
@@ -350,7 +408,7 @@ configuration_remove_machine(ClusterConfiguration * c, ClusterMachine * m)
build_cluster_hash_table(cc);
INK_MEMORY_BARRIER; // commit writes before freeing old hash table
- CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CONFIGURATION_CHANGES_STAT);
+ //CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CONFIGURATION_CHANGES_STAT);
free_configuration(c, cc);
return cc;
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/62504a9f/iocore/cluster/ClusterMachine.cc
----------------------------------------------------------------------
diff --git a/iocore/cluster/ClusterMachine.cc b/iocore/cluster/ClusterMachine.cc
index 00e99e6..1ac4fae 100644
--- a/iocore/cluster/ClusterMachine.cc
+++ b/iocore/cluster/ClusterMachine.cc
@@ -77,9 +77,9 @@ ClusterMachine::ClusterMachine(char *ahostname, unsigned int aip, int aport)
msg_proto_minor(0),
clusterHandlers(0)
{
- EThread *thread = this_ethread();
- ProxyMutex *mutex = thread->mutex;
- CLUSTER_INCREMENT_DYN_STAT(CLUSTER_MACHINES_ALLOCATED_STAT);
+ //EThread *thread = this_ethread();
+ //ProxyMutex *mutex = thread->mutex;
+ //CLUSTER_INCREMENT_DYN_STAT(CLUSTER_MACHINES_ALLOCATED_STAT);
if (!aip) {
char localhost[1024];
if (!ahostname) {
@@ -166,7 +166,7 @@ ClusterHandler *ClusterMachine::pop_ClusterHandler(int no_rr)
ClusterMachine::~ClusterMachine()
{
ats_free(hostname);
- ats_free(clusterHandlers);
+ // ats_free(clusterHandlers);
}
struct MachineTimeoutContinuation;
@@ -193,10 +193,10 @@ struct MachineTimeoutContinuation: public Continuation
void
free_ClusterMachine(ClusterMachine * m)
{
- EThread *thread = this_ethread();
- ProxyMutex *mutex = thread->mutex;
+ //EThread *thread = this_ethread();
+ //ProxyMutex *mutex = thread->mutex;
// delay before the final free
- CLUSTER_INCREMENT_DYN_STAT(CLUSTER_MACHINES_FREED_STAT);
+ //CLUSTER_INCREMENT_DYN_STAT(CLUSTER_MACHINES_FREED_STAT);
m->dead = true;
eventProcessor.schedule_in(NEW(new MachineTimeoutContinuation(m)), MACHINE_TIMEOUT, ET_CALL);
}
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/62504a9f/iocore/cluster/ClusterProcessor.cc
----------------------------------------------------------------------
diff --git a/iocore/cluster/ClusterProcessor.cc b/iocore/cluster/ClusterProcessor.cc
index b01e0ff..ab6c0a0 100644
--- a/iocore/cluster/ClusterProcessor.cc
+++ b/iocore/cluster/ClusterProcessor.cc
@@ -28,21 +28,34 @@
****************************************************************************/
#include "P_Cluster.h"
+#include "global.h"
+#include "connection.h"
+
/*************************************************************************/
// ClusterProcessor member functions (Public class)
/*************************************************************************/
int cluster_port_number = DEFAULT_CLUSTER_PORT_NUMBER;
int cache_clustering_enabled = 0;
int num_of_cluster_threads = DEFAULT_NUMBER_OF_CLUSTER_THREADS;
+int num_of_cluster_connections = 0;
ClusterProcessor clusterProcessor;
RecRawStatBlock *cluster_rsb = NULL;
int ET_CLUSTER;
+void cluster_main_handler(ClusterSession session, void *context,
+ const int func_id, IOBufferBlock *data, const int data_len);
ClusterProcessor::ClusterProcessor():accept_handler(NULL), this_cluster(NULL)
{
}
+
+//void cluster_error_handler(int event, void *arg);
+//void cluster_main_handler(ClusterSession *session, const int func_id,
+// void *data, const int data_len) {
+// ClusterRPC[func_id](session, data, data_len);
+//}
+
ClusterProcessor::~ClusterProcessor()
{
if (accept_handler) {
@@ -55,98 +68,111 @@ int
ClusterProcessor::internal_invoke_remote(ClusterHandler *ch, int cluster_fn,
void *data, int len, int options, void *cmsg)
{
- EThread *thread = this_ethread();
- ProxyMutex *mutex = thread->mutex;
- //
- // RPC facility for intercluster communication available to other
- // subsystems.
- //
- bool steal = (options & CLUSTER_OPT_STEAL ? true : false);
- bool delay = (options & CLUSTER_OPT_DELAY ? true : false);
- bool data_in_ocntl = (options & CLUSTER_OPT_DATA_IS_OCONTROL ? true : false);
- bool malloced = (cluster_fn == CLUSTER_FUNCTION_MALLOCED);
- OutgoingControl *c;
-
- if (!ch || (!malloced && !((unsigned int) cluster_fn < (uint32_t) SIZE_clusterFunction))) {
- // Invalid message or node is down, free message data
- if (malloced) {
- ats_free(data);
- }
- if (cmsg) {
- invoke_remote_data_args *args = (invoke_remote_data_args *)
- (((OutgoingControl *) cmsg)->data + sizeof(int32_t));
- ink_assert(args->magicno == invoke_remote_data_args::MagicNo);
+// EThread *thread = this_ethread();
+// ProxyMutex *mutex = thread->mutex;
+// //
+// // RPC facility for intercluster communication available to other
+// // subsystems.
+// //
+// bool steal = (options & CLUSTER_OPT_STEAL ? true : false);
+// bool delay = (options & CLUSTER_OPT_DELAY ? true : false);
+// bool data_in_ocntl = (options & CLUSTER_OPT_DATA_IS_OCONTROL ? true : false);
+// bool malloced = (cluster_fn == CLUSTER_FUNCTION_MALLOCED);
+// OutgoingControl *c;
+//
+// if (!ch || (!malloced && !((unsigned int) cluster_fn < (uint32_t) SIZE_clusterFunction))) {
+// // Invalid message or node is down, free message data
+// if (malloced) {
+// ats_free(data);
+// }
+// if (cmsg) {
+// invoke_remote_data_args *args = (invoke_remote_data_args *)
+// (((OutgoingControl *) cmsg)->data + sizeof(int32_t));
+// ink_assert(args->magicno == invoke_remote_data_args::MagicNo);
+//
+// args->data_oc->freeall();
+// ((OutgoingControl *) cmsg)->freeall();
+// }
+// if (data_in_ocntl) {
+// c = *((OutgoingControl **) ((char *) data - sizeof(OutgoingControl *)));
+// c->freeall();
+// }
+// return -1;
+// }
+//
+// if (data_in_ocntl) {
+// c = *((OutgoingControl **) ((char *) data - sizeof(OutgoingControl *)));
+// } else {
+// c = OutgoingControl::alloc();
+// }
+// CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CTRL_MSGS_SENT_STAT);
+// c->submit_time = ink_get_hrtime();
+// if ((c->zero_body = zero_body)) {
+// c->free_proc = &CacheContinuation::disposeOfDataBuffer;
+// c->free_proc_arg = cc;
+// }
+//
+// if (malloced) {
+// c->set_data((char *) data, len);
+// } else {
+// if (!data_in_ocntl) {
+// c->len = len + sizeof(int32_t);
+// c->alloc_data();
+// }
+// if (!c->fast_data()) {
+// CLUSTER_INCREMENT_DYN_STAT(CLUSTER_SLOW_CTRL_MSGS_SENT_STAT);
+// }
+// *(int32_t *) c->data = cluster_fn;
+// if (!data_in_ocntl) {
+// memcpy(c->data + sizeof(int32_t), data, len);
+// }
+// }
+//
+// SET_CONTINUATION_HANDLER(c, (OutgoingCtrlHandler) & OutgoingControl::startEvent);
+//
+// /////////////////////////////////////
+// // Compound message adjustments
+// /////////////////////////////////////
+// if (cmsg) {
+// invoke_remote_data_args *args = (invoke_remote_data_args *)
+// (((OutgoingControl *) cmsg)->data + sizeof(int32_t));
+// ink_assert(args->magicno == invoke_remote_data_args::MagicNo);
+// args->msg_oc = c;
+// c = (OutgoingControl *) cmsg;
+// }
+//#ifndef CLUSTER_THREAD_STEALING
+// delay = true;
+//#endif
+// if (!delay) {
+// EThread *tt = this_ethread();
+// {
+// int q = ClusterFuncToQpri(cluster_fn);
+// ink_atomiclist_push(&ch->outgoing_control_al[q], (void *) c);
+//
+// MUTEX_TRY_LOCK(lock, ch->mutex, tt);
+// if (!lock) {
+// if(ch->thread && ch->thread->signal_hook)
+// ch->thread->signal_hook(ch->thread);
+// return 1;
+// }
+// if (steal)
+// ch->steal_thread(tt);
+// return 1;
+// }
+// } else {
+// c->mutex = ch->mutex;
+// eventProcessor.schedule_imm_signal(c);
+// return 0;
+// }
+
+ (void) ch;
+ (void) cluster_fn;
+ (void) data;
+ (void) len;
+ (void) options;
+ (void) cmsg;
- args->data_oc->freeall();
- ((OutgoingControl *) cmsg)->freeall();
- }
- if (data_in_ocntl) {
- c = *((OutgoingControl **) ((char *) data - sizeof(OutgoingControl *)));
- c->freeall();
- }
- return -1;
- }
-
- if (data_in_ocntl) {
- c = *((OutgoingControl **) ((char *) data - sizeof(OutgoingControl *)));
- } else {
- c = OutgoingControl::alloc();
- }
- CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CTRL_MSGS_SENT_STAT);
- c->submit_time = ink_get_hrtime();
-
- if (malloced) {
- c->set_data((char *) data, len);
- } else {
- if (!data_in_ocntl) {
- c->len = len + sizeof(int32_t);
- c->alloc_data();
- }
- if (!c->fast_data()) {
- CLUSTER_INCREMENT_DYN_STAT(CLUSTER_SLOW_CTRL_MSGS_SENT_STAT);
- }
- *(int32_t *) c->data = cluster_fn;
- if (!data_in_ocntl) {
- memcpy(c->data + sizeof(int32_t), data, len);
- }
- }
-
- SET_CONTINUATION_HANDLER(c, (OutgoingCtrlHandler) & OutgoingControl::startEvent);
-
- /////////////////////////////////////
- // Compound message adjustments
- /////////////////////////////////////
- if (cmsg) {
- invoke_remote_data_args *args = (invoke_remote_data_args *)
- (((OutgoingControl *) cmsg)->data + sizeof(int32_t));
- ink_assert(args->magicno == invoke_remote_data_args::MagicNo);
- args->msg_oc = c;
- c = (OutgoingControl *) cmsg;
- }
-#ifndef CLUSTER_THREAD_STEALING
- delay = true;
-#endif
- if (!delay) {
- EThread *tt = this_ethread();
- {
- int q = ClusterFuncToQpri(cluster_fn);
- ink_atomiclist_push(&ch->outgoing_control_al[q], (void *) c);
-
- MUTEX_TRY_LOCK(lock, ch->mutex, tt);
- if (!lock) {
- if(ch->thread && ch->thread->signal_hook)
- ch->thread->signal_hook(ch->thread);
- return 1;
- }
- if (steal)
- ch->steal_thread(tt);
- return 1;
- }
- } else {
- c->mutex = ch->mutex;
- eventProcessor.schedule_imm_signal(c);
- return 0;
- }
+ return 0;
}
int
@@ -162,45 +188,56 @@ ClusterProcessor::invoke_remote_data(ClusterHandler *ch, int cluster_fn,
int dest_channel, ClusterVCToken * token,
void (*bufdata_free_proc) (void *), void *bufdata_free_proc_arg, int options)
{
- if (!buf) {
- // No buffer data, translate this into a invoke_remote() request
- return internal_invoke_remote(ch, cluster_fn, data, data_len, options, (void *) NULL);
- }
- ink_assert(data);
- ink_assert(data_len);
- ink_assert(dest_channel);
- ink_assert(token);
- ink_assert(bufdata_free_proc);
- ink_assert(bufdata_free_proc_arg);
-
- /////////////////////////////////////////////////////////////////////////
- // Build the compound message as described by invoke_remote_data_args.
- /////////////////////////////////////////////////////////////////////////
-
- // Build OutgoingControl for buffer data
- OutgoingControl *bufdata_oc = OutgoingControl::alloc();
- bufdata_oc->set_data(buf, bufdata_free_proc, bufdata_free_proc_arg);
-
- // Build OutgoingControl for compound message header
- invoke_remote_data_args mh;
- mh.msg_oc = 0;
- mh.data_oc = bufdata_oc;
- mh.dest_channel = dest_channel;
- mh.token = *token;
-
- OutgoingControl *chdr = OutgoingControl::alloc();
- chdr->submit_time = ink_get_hrtime();
- chdr->len = sizeof(int32_t) + sizeof(mh);
- chdr->alloc_data();
- *(int32_t *) chdr->data = -1; // always -1 for compound message
- memcpy(chdr->data + sizeof(int32_t), (char *) &mh, sizeof(mh));
-
- return internal_invoke_remote(ch, cluster_fn, data, data_len, options, (void *) chdr);
+// if (!buf) {
+// // No buffer data, translate this into a invoke_remote() request
+// return internal_invoke_remote(ch, cluster_fn, data, data_len, options, (void *) NULL, zero_body, bufdata_free_proc_arg);
+// }
+// ink_assert(data);
+// ink_assert(data_len);
+// ink_assert(dest_channel);
+// ink_assert(token);
+// ink_assert(bufdata_free_proc);
+// ink_assert(bufdata_free_proc_arg);
+//
+// /////////////////////////////////////////////////////////////////////////
+// // Build the compound message as described by invoke_remote_data_args.
+// /////////////////////////////////////////////////////////////////////////
+//
+// // Build OutgoingControl for buffer data
+// OutgoingControl *bufdata_oc = OutgoingControl::alloc();
+// bufdata_oc->set_data(buf, bufdata_free_proc, bufdata_free_proc_arg);
+//
+// // Build OutgoingControl for compound message header
+// invoke_remote_data_args mh;
+// mh.msg_oc = 0;
+// mh.data_oc = bufdata_oc;
+// mh.dest_channel = dest_channel;
+// mh.token = *token;
+//
+// OutgoingControl *chdr = OutgoingControl::alloc();
+// chdr->submit_time = ink_get_hrtime();
+// chdr->len = sizeof(int32_t) + sizeof(mh);
+// chdr->alloc_data();
+// *(int32_t *) chdr->data = -1; // always -1 for compound message
+// memcpy(chdr->data + sizeof(int32_t), (char *) &mh, sizeof(mh));
+//
+// return internal_invoke_remote(ch, cluster_fn, data, data_len, options, (void *) chdr);
+
+ (void) ch;
+ (void) cluster_fn;
+ (void) data;
+ (void) data_len;
+ (void) buf;
+ (void) dest_channel;
+ (void) token;
+ (void) bufdata_free_proc;
+ (void) bufdata_free_proc_arg;
+ (void) options;
+ return 0;
}
-// TODO: Why pass in the length here if not used ?
void
-ClusterProcessor::free_remote_data(char *p, int /* l ATS_UNUSED */)
+ClusterProcessor::free_remote_data(char *p, int /* l */)
{
char *d = p - sizeof(int32_t); // reset to ptr to function code
int data_hdr = ClusterControl::DATA_HDR;
@@ -225,65 +262,70 @@ ClusterProcessor::free_remote_data(char *p, int /* l ATS_UNUSED */)
}
ClusterVConnection *
-ClusterProcessor::open_local(Continuation * cont, ClusterMachine */* m ATS_UNUSED */, ClusterVCToken & token, int options)
+ClusterProcessor::open_local(Continuation * cont, ClusterMachine * m, ClusterVCToken & token, int options)
{
- //
- // New connect protocol.
- // As a VC initiator, establish the VC connection to the remote node
- // by allocating the VC locally and requiring the caller to pass the
- // token and channel id in the remote request. The remote handler calls
- // connect_local to establish the remote side of the connection.
- //
- bool immediate = ((options & CLUSTER_OPT_IMMEDIATE) ? true : false);
- bool allow_immediate = ((options & CLUSTER_OPT_ALLOW_IMMEDIATE) ? true : false);
-
- ClusterHandler *ch = ((CacheContinuation *)cont)->ch;
- if (!ch)
- return NULL;
- EThread *t = ch->thread;
- if (!t)
- return NULL;
-
- EThread *thread = this_ethread();
- ProxyMutex *mutex = thread->mutex;
- ClusterVConnection *vc = clusterVCAllocator.alloc();
- vc->new_connect_read = (options & CLUSTER_OPT_CONN_READ ? 1 : 0);
- vc->start_time = ink_get_hrtime();
- vc->last_activity_time = vc->start_time;
- vc->ch = ch;
- vc->token.alloc();
- vc->token.ch_id = ch->id;
- token = vc->token;
-#ifdef CLUSTER_THREAD_STEALING
- CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CONNECTIONS_OPENNED_STAT);
- CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CONNECTIONS_OPEN_STAT);
- MUTEX_TRY_LOCK(lock, ch->mutex, thread);
- if (!lock) {
-#endif
- if (immediate) {
- clusterVCAllocator_free(vc);
- return NULL;
- }
- vc->action_ = cont;
- ink_atomiclist_push(&ch->external_incoming_open_local, (void *) vc);
- if(ch->thread && ch->thread->signal_hook)
- ch->thread->signal_hook(ch->thread);
- return CLUSTER_DELAYED_OPEN;
-
-#ifdef CLUSTER_THREAD_STEALING
- } else {
- if (!(immediate || allow_immediate))
- vc->action_ = cont;
- if (vc->start(thread) < 0) {
- return NULL;
- }
- if (immediate || allow_immediate) {
- return vc;
- } else {
- return CLUSTER_DELAYED_OPEN;
- }
- }
-#endif
+// //
+// // New connect protocol.
+// // As a VC initiator, establish the VC connection to the remote node
+// // by allocating the VC locally and requiring the caller to pass the
+// // token and channel id in the remote request. The remote handler calls
+// // connect_local to establish the remote side of the connection.
+// //
+// bool immediate = ((options & CLUSTER_OPT_IMMEDIATE) ? true : false);
+// bool allow_immediate = ((options & CLUSTER_OPT_ALLOW_IMMEDIATE) ? true : false);
+//
+// ClusterHandler *ch = ((CacheContinuation *)cont)->ch;
+// if (!ch)
+// return NULL;
+// EThread *t = ch->thread;
+// if (!t)
+// return NULL;
+//
+// EThread *thread = this_ethread();
+// ProxyMutex *mutex = thread->mutex;
+// ClusterVConnection *vc = clusterVCAllocator.alloc();
+// vc->new_connect_read = (options & CLUSTER_OPT_CONN_READ ? 1 : 0);
+// vc->start_time = ink_get_hrtime();
+// vc->last_activity_time = vc->start_time;
+// vc->ch = ch;
+// vc->token.alloc();
+// vc->token.ch_id = ch->id;
+// token = vc->token;
+//#ifdef CLUSTER_THREAD_STEALING
+// CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CONNECTIONS_OPENNED_STAT);
+// CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CONNECTIONS_OPEN_STAT);
+// MUTEX_TRY_LOCK(lock, ch->mutex, thread);
+// if (!lock) {
+//#endif
+// if (immediate) {
+// clusterVCAllocator_free(vc);
+// return NULL;
+// }
+// vc->action_ = cont;
+// ink_atomiclist_push(&ch->external_incoming_open_local, (void *) vc);
+// if(ch->thread && ch->thread->signal_hook)
+// ch->thread->signal_hook(ch->thread);
+// return CLUSTER_DELAYED_OPEN;
+//
+//#ifdef CLUSTER_THREAD_STEALING
+// } else {
+// if (!(immediate || allow_immediate))
+// vc->action_ = cont;
+// if (vc->start(thread) < 0) {
+// return NULL;
+// }
+// if (immediate || allow_immediate) {
+// return vc;
+// } else {
+// return CLUSTER_DELAYED_OPEN;
+// }
+// }
+//#endif
+ (void) cont;
+ (void) m;
+ (void) token;
+ (void) options;
+ return NULL;
}
ClusterVConnection *
@@ -367,9 +409,10 @@ bool ClusterProcessor::disable_remote_cluster_ops(ClusterMachine * m)
// Simplify debug access to stats
////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////
-
+/*
GlobalClusterPeriodicEvent *
PeriodicClusterEvent;
+*/
#ifdef CLUSTER_TOMCAT
extern int cache_clustering_enabled;
@@ -386,6 +429,77 @@ unsigned long cluster_packet_tos = 0;
int RPC_only_CacheCluster = 0;
#endif
+static int machine_change_notify(ClusterMachine * m)
+{
+ //char textbuf[sizeof("255.255.255.255:65535")];
+ int result;
+
+ Debug("cluster_io", "start notify, machine %s %hhu.%hhu.%hhu.%hhu:%d, version: %d.%d",
+ m->dead ? "down" : "up",
+ DOT_SEPARATED(m->ip), m->cluster_port, m->msg_proto_major,
+ m->msg_proto_minor);
+
+ if (m->dead) {
+ ClusterConfiguration *c = this_cluster()->current_configuration();
+ if (c->find(m->ip, m->cluster_port)) {
+ ClusterConfiguration *cc = configuration_remove_machine(c, m);
+ //CLUSTER_DECREMENT_DYN_STAT(CLUSTER_NODES_STAT);
+ this_cluster()->configurations.push(cc);
+ result = 0;
+ }
+ else {
+ result = ENOENT;
+ }
+
+ Note("machine down %hhu.%hhu.%hhu.%hhu:%d, version=%d.%d",
+ DOT_SEPARATED(m->ip), m->cluster_port, m->msg_proto_major,
+ m->msg_proto_minor);
+ /*
+ snprintf(textbuf, sizeof(textbuf), "%hhu.%hhu.%hhu.%hhu:%d", DOT_SEPARATED(m->ip), m->cluster_port);
+ REC_SignalManager(REC_SIGNAL_MACHINE_DOWN, textbuf);
+ */
+ }
+ else {
+ ClusterConfiguration *c = this_cluster()->current_configuration();
+ if (c->find(m->ip, m->cluster_port)) {
+ Warning("machine %hhu.%hhu.%hhu.%hhu:%d already up",
+ DOT_SEPARATED(m->ip), m->cluster_port);
+ result = EEXIST;
+ }
+ else {
+ ClusterConfiguration *cconf = configuration_add_machine(c, m);
+ //CLUSTER_INCREMENT_DYN_STAT(CLUSTER_NODES_STAT);
+ this_cluster()->configurations.push(cconf);
+ result = 0;
+ }
+
+ Note("machine up %hhu.%hhu.%hhu.%hhu:%d, version=%d.%d",
+ DOT_SEPARATED(m->ip), m->cluster_port, m->msg_proto_major,
+ m->msg_proto_minor);
+
+ /*
+ snprintf(textbuf, sizeof(textbuf), "%hhu.%hhu.%hhu.%hhu:%d", DOT_SEPARATED(m->ip), m->cluster_port);
+ REC_SignalManager(REC_SIGNAL_MACHINE_UP, textbuf);
+ */
+ }
+
+ return result;
+}
+
+
+static int
+cluster_ping_config_cb(const char *name, RecDataT /* data_type */, RecData data, void * /* cookie */)
+{
+ if (strcmp(name, "proxy.config.cluster.ping_send_interval_msecs") == 0) {
+ cluster_ping_send_interval = data.rec_int * HRTIME_MSECOND;
+ }
+ else if (strcmp(name, "proxy.config.cluster.ping_latency_threshold_msecs") == 0) {
+ cluster_ping_latency_threshold = data.rec_int * HRTIME_MSECOND;
+ }
+
+ return 0;
+}
+
int
ClusterProcessor::init()
{
@@ -686,6 +800,11 @@ ClusterProcessor::init()
if (num_of_cluster_threads == DEFAULT_NUMBER_OF_CLUSTER_THREADS)
REC_ReadConfigInteger(num_of_cluster_threads, "proxy.config.cluster.threads");
+ REC_ReadConfigInteger(num_of_cluster_connections, "proxy.config.cluster.connections");
+ if (num_of_cluster_connections == 0) {
+ num_of_cluster_connections = num_of_cluster_threads;
+ }
+
REC_EstablishStaticConfigInt32(CacheClusterMonitorEnabled, "proxy.config.cluster.enable_monitor");
REC_EstablishStaticConfigInt32(CacheClusterMonitorIntervalSecs, "proxy.config.cluster.monitor_interval_secs");
REC_ReadConfigInteger(cluster_receive_buffer_size, "proxy.config.cluster.receive_buffer_size");
@@ -695,17 +814,28 @@ ClusterProcessor::init()
REC_ReadConfigInteger(cluster_packet_tos, "proxy.config.cluster.sock_packet_tos");
REC_EstablishStaticConfigInt32(RPC_only_CacheCluster, "proxy.config.cluster.rpc_cache_cluster");
+ REC_EstablishStaticConfigInteger(cluster_flow_ctrl_min_bps, "proxy.config.cluster.flow_ctrl.min_bps");
+ REC_EstablishStaticConfigInteger(cluster_flow_ctrl_max_bps, "proxy.config.cluster.flow_ctrl.max_bps");
+ REC_EstablishStaticConfigInt32(cluster_send_min_wait_time, "proxy.config.cluster.flow_ctrl.min_send_wait_time");
+ REC_EstablishStaticConfigInt32(cluster_send_max_wait_time, "proxy.config.cluster.flow_ctrl.max_send_wait_time");
+ REC_EstablishStaticConfigInt32(cluster_min_loop_interval, "proxy.config.cluster.flow_ctrl.min_loop_interval");
+ REC_EstablishStaticConfigInt32(cluster_max_loop_interval, "proxy.config.cluster.flow_ctrl.max_loop_interval");
+
int cluster_type = 0;
REC_ReadConfigInteger(cluster_type, "proxy.local.cluster.type");
create_this_cluster_machine();
+
+ /*
#ifdef NON_MODULAR
// Cluster API Initializations
clusterAPI_init();
#endif
+
// Start global Cluster periodic event
PeriodicClusterEvent = NEW(new GlobalClusterPeriodicEvent);
PeriodicClusterEvent->init();
+ */
this_cluster = NEW(new Cluster);
ClusterConfiguration *cc = NEW(new ClusterConfiguration);
@@ -713,19 +843,61 @@ ClusterProcessor::init()
cc->n_machines = 1;
cc->machines[0] = this_cluster_machine();
memset(cc->hash_table, 0, CLUSTER_HASH_TABLE_SIZE);
- // 0 dummy output data
+ /*
+ // 0 dummy output data
memset(channel_dummy_output, 0, sizeof(channel_dummy_output));
+ */
+
+ int result;
if (cluster_type == 1) {
- cache_clustering_enabled = 1;
- Note("cache clustering enabled");
- compute_cluster_mode();
+ REC_ReadConfigInteger(cluster_ping_send_interval, "proxy.config.cluster.ping_send_interval_msecs");
+ REC_ReadConfigInteger(cluster_ping_latency_threshold, "proxy.config.cluster.ping_latency_threshold_msecs");
+ cluster_ping_send_interval *= HRTIME_MSECOND;
+ cluster_ping_latency_threshold *= HRTIME_MSECOND;
+
+ REC_RegisterConfigUpdateFunc("proxy.config.cluster.ping_send_interval_msecs", cluster_ping_config_cb, NULL);
+ REC_RegisterConfigUpdateFunc("proxy.config.cluster.ping_latency_threshold_msecs", cluster_ping_config_cb, NULL);
+ REC_EstablishStaticConfigInt32(cluster_ping_retries, "proxy.config.cluster.ping_retries");
+
+ REC_ReadConfigInteger(max_session_count_per_machine, "proxy.config.cluster.max_sessions_per_machine");
+ REC_ReadConfigInteger(session_lock_count_per_machine, "proxy.config.cluster.session_locks_per_machine");
+
+ bool found;
+ IpEndpoint cluster_ip; // ip addr of the cluster interface
+ char *intrName; // Name of the interface we are to use
+ intrName = REC_readString("proxy.config.cluster.ethernet_interface", &found);
+ ink_assert(found && intrName != NULL);
+
+ found = mgmt_getAddrForIntr(intrName, &cluster_ip.sa);
+ if (!found) {
+ ink_fatal(1, "[ClusterProcessor::init] Unable to find network interface %s. Exiting...\n", intrName);
+ } else if (!ats_is_ip4(&cluster_ip)) {
+ ink_fatal(1, "[ClusterProcessor::init] Unable to find IPv4 network interface %s. Exiting...\n", intrName);
+ }
+
+ if (num_of_cluster_connections % 2 != 0) {
+ num_of_cluster_connections++;
+ }
+ cluster_global_init(cluster_main_handler, machine_change_notify);
+
+ result = connection_manager_init(cluster_ip.sin.sin_addr.s_addr);
+ if (result == 0) {
+ cache_clustering_enabled = 1;
+ Note("cache clustering enabled");
+ compute_cluster_mode();
+ }
+ else {
+ cache_clustering_enabled = 0;
+ Note("init fail, cache clustering disabled");
+ }
} else {
cache_clustering_enabled = 0;
Note("cache clustering disabled");
+ result = 0;
}
- return 0;
+ return result;
}
// function added to adhere to the name calling convention of init functions
@@ -742,13 +914,18 @@ ClusterProcessor::start()
this_cluster_machine()->cluster_port = cluster_port;
#endif
if (cache_clustering_enabled && (cacheProcessor.IsCacheEnabled() == CACHE_INITIALIZED)) {
- size_t stacksize;
- REC_ReadConfigInteger(stacksize, "proxy.config.thread.default.stacksize");
- ET_CLUSTER = eventProcessor.spawn_event_threads(num_of_cluster_threads, "ET_CLUSTER", stacksize);
+ /*
+ ET_CLUSTER = eventProcessor.spawn_event_threads(num_of_cluster_threads, "ET_CLUSTER");
for (int i = 0; i < eventProcessor.n_threads_for_type[ET_CLUSTER]; i++) {
- initialize_thread_for_net(eventProcessor.eventthread[ET_CLUSTER][i]);
+ initialize_thread_for_net(eventProcessor.eventthread[ET_CLUSTER][i], i);
+#ifndef STANDALONE_IOCORE
+ extern void initialize_thread_for_http_sessions(EThread *thread, int thread_index);
+ initialize_thread_for_http_sessions(eventProcessor.eventthread[ET_CLUSTER][i], i);
+#endif
}
+ */
+
REC_RegisterConfigUpdateFunc("proxy.config.cluster.cluster_configuration", machine_config_change, (void *) CLUSTER_CONFIG);
do_machine_config_change((void *) CLUSTER_CONFIG, "proxy.config.cluster.cluster_configuration");
// TODO: Remove this?
@@ -757,9 +934,12 @@ ClusterProcessor::start()
do_machine_config_change((void *) MACHINE_CONFIG, "proxy.config.cluster.machine_configuration");
#endif
- accept_handler = NEW(new ClusterAccept(&cluster_port, cluster_receive_buffer_size, cluster_send_buffer_size));
- accept_handler->Init();
+ //accept_handler = NEW(new ClusterAccept(&cluster_port, cluster_receive_buffer_size, cluster_send_buffer_size));
+ //accept_handler->Init();
+
+ connection_manager_start();
}
+
return 0;
}
@@ -846,4 +1026,53 @@ ClusterProcessor::compute_cluster_mode()
}
}
+
+void cluster_main_handler(ClusterSession session, void *context,
+ const int func_id, IOBufferBlock *data, const int data_len)
+{
+ int event = func_id < 0 ? -func_id: func_id;
+ switch (event) {
+ case CLUSTER_CACHE_DATA_ABORT:
+ case CLUSTER_CACHE_DATA_READ_REENABLE: {
+ ink_assert(data_len == 0 && context && data == NULL);
+ CacheContinuation *cc = (CacheContinuation *) context;
+ cc->thread->schedule_imm(cc, event);
+ return;
+ }
+ default: {
+ ClusterCont *cc = clusterContAllocator.alloc();
+ SET_CONTINUATION_HANDLER(cc, &ClusterCont::handleEvent);
+ cc->session = session;
+ cc->context = context;
+ cc->func_id = event;
+ cc->data = data;
+ cc->data_len = data_len;
+ cc->_action = (Continuation *) context;
+ if (cc->_action.continuation) {
+ cc->mutex = cc->_action.mutex;
+ }
+#ifdef DEBUG
+ int64_t nbytes = 0;
+ for (IOBufferBlock *b = data; b; b = b->next) {
+ nbytes += b->read_avail();
+ }
+ ink_assert(data_len == nbytes);
+#endif
+
+ if (event == CLUSTER_CACHE_DATA_READ_DONE
+ || event == CLUSTER_CACHE_DATA_ERROR
+ || event == CLUSTER_CACHE_OP_RESULT_CLUSTER_FUNCTION) {
+ ink_assert(context);
+ ClusterCacheVC *cvc = (ClusterCacheVC *) context;
+ cvc->initial_thread->schedule_imm(cc);
+ return;
+ }
+
+ eventProcessor.schedule_imm(cc);
+ return;
+ }
+ }
+}
+
+
// End of ClusterProcessor.cc
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/62504a9f/iocore/cluster/ClusterVConnection.cc
----------------------------------------------------------------------
diff --git a/iocore/cluster/ClusterVConnection.cc b/iocore/cluster/ClusterVConnection.cc
index 0630edf..a76c912 100644
--- a/iocore/cluster/ClusterVConnection.cc
+++ b/iocore/cluster/ClusterVConnection.cc
@@ -30,6 +30,17 @@
#include "P_Cluster.h"
ClassAllocator<ClusterVConnection> clusterVCAllocator("clusterVCAllocator");
ClassAllocator<ByteBankDescriptor> byteBankAllocator("byteBankAllocator");
+ClassAllocator<ClusterCacheVC> clusterCacheVCAllocator("custerCacheVCAllocator");
+
+int ClusterCacheVC::size_to_init = -1;
+
+#define CLUSTER_WRITE_MIN_SIZE (1 << 16)
+
+#define CLUSTER_CACHE_VC_CLOSE_SESSION \
+{ \
+ cluster_close_session(cs); \
+ session_closed = true; \
+}
ByteBankDescriptor *
ByteBankDescriptor::ByteBankDescriptor_alloc(IOBufferBlock * iob)
@@ -271,13 +282,13 @@ ClusterVConnection::do_io_write(Continuation * c, int64_t nbytes, IOBufferReader
void
ClusterVConnection::do_io_close(int alerrno)
{
- if ((type == VC_CLUSTER) && current_cont) {
- if (((CacheContinuation *)current_cont)->read_cluster_vc == this)
- type = VC_CLUSTER_READ;
- else if (((CacheContinuation *)current_cont)->write_cluster_vc == this)
- type = VC_CLUSTER_WRITE;
- }
- ch->vcs_push(this, type);
+// if ((type == VC_CLUSTER) && current_cont) {
+// if (((CacheContinuation *)current_cont)->read_cluster_vc == this)
+// type = VC_CLUSTER_READ;
+// else if (((CacheContinuation *)current_cont)->write_cluster_vc == this)
+// type = VC_CLUSTER_WRITE;
+// }
+// ch->vcs_push(this, type);
ClusterVConnectionBase::do_io_close(alerrno);
}
@@ -650,4 +661,527 @@ ClusterVConnection::get_disk_io_priority()
return disk_io_priority;
}
+
+ClusterCacheVC::ClusterCacheVC() {
+ size_to_init = sizeof(ClusterCacheVC) - (size_t) & ((ClusterCacheVC *) 0)->vio;
+ memset((char *) &vio, 0, size_to_init);
+}
+
+int
+ClusterCacheVC::handleRead(int, void *)
+{
+ ink_assert(!in_progress && !remote_closed);
+ PUSH_HANDLER(&ClusterCacheVC::openReadReadDone);
+ if (vio.nbytes > 0 && total_len == 0) {
+ SetIOReadMessage msg;
+ msg.nbytes = vio.nbytes;
+ msg.offset = seek_to;
+ if (!cluster_send_message(cs, -CLUSTER_CACHE_DATA_READ_BEGIN, (char *) &msg,
+ sizeof(msg), PRIORITY_HIGH)) {
+ in_progress = true;
+ cluster_set_events(cs, RESPONSE_EVENT_NOTIFY_DEALER);
+ return EVENT_CONT;
+ }
+ goto Lfailed;
+ }
+
+ if (!cluster_send_message(cs, -CLUSTER_CACHE_DATA_READ_REENABLE, NULL, 0,
+ PRIORITY_HIGH)) {
+ in_progress = true;
+ cluster_set_events(cs, RESPONSE_EVENT_NOTIFY_DEALER);
+ return EVENT_CONT;
+ }
+ Lfailed:
+ CLUSTER_CACHE_VC_CLOSE_SESSION;
+ return calluser(VC_EVENT_ERROR);
+}
+
+int
+ClusterCacheVC::openReadReadDone(int event, void *data)
+{
+ cancel_trigger();
+ ink_assert(in_progress);
+ if (event == EVENT_IMMEDIATE)
+ return EVENT_CONT;
+
+ in_progress = false;
+ POP_HANDLER;
+
+ switch (event) {
+ case CLUSTER_CACHE_DATA_ERROR:
+ {
+ ClusterCont *cc = (ClusterCont *) data;
+ ink_assert(cc && cc->data_len > 0);
+ remote_closed = true;
+ event = *(int *) cc->data->start();
+ break;
+ }
+ case CLUSTER_CACHE_DATA_READ_DONE:
+ {
+ ClusterCont *cc = (ClusterCont *) data;
+ ink_assert(cc && d_len == 0);
+
+ d_len = cc->data_len;
+ total_len += d_len;
+ blocks = cc->data;
+ if (total_len >= vio.nbytes)
+ remote_closed = true;
+ break;
+ }
+ case CLUSTER_INTERNEL_ERROR:
+ default:
+ event = VC_EVENT_ERROR;
+ remote_closed = true;
+ break;
+ }
+
+ if (closed) {
+ if (!remote_closed)
+ cluster_send_message(cs, CLUSTER_CACHE_DATA_ABORT, NULL, 0, PRIORITY_HIGH);
+
+ free_ClusterCacheVC(this);
+ return EVENT_DONE;
+ }
+ // recevied data from cluster
+
+ return handleEvent(event, data);
+}
+
+int
+ClusterCacheVC::openReadStart(int event, void *data)
+{
+ ink_assert(in_progress);
+ in_progress = false;
+ if (_action.cancelled) {
+ if (!remote_closed)
+ cluster_send_message(cs, CLUSTER_CACHE_DATA_ABORT, NULL, 0, PRIORITY_HIGH);
+ free_ClusterCacheVC(this);
+ return EVENT_DONE;
+ }
+ if (event != CACHE_EVENT_OPEN_READ) {
+ if (event == CACHE_EVENT_OPEN_WRITE) {
+ // the remote side do the pre_write
+ vio.op = VIO::WRITE;
+ SET_HANDLER(&ClusterCacheVC::openWriteMain);
+ _action.continuation->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, this);
+ return EVENT_DONE;
+ }
+ // prevent further trigger
+ remote_closed = true;
+ CLUSTER_CACHE_VC_CLOSE_SESSION;
+ _action.continuation->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, data);
+ free_ClusterCacheVC(this);
+ return EVENT_DONE;
+ }
+
+ SET_HANDLER(&ClusterCacheVC::openReadMain);
+ callcont(CACHE_EVENT_OPEN_READ);
+ return EVENT_CONT;
+}
+int
+ClusterCacheVC::openReadMain(int event, void *e)
+{
+ cancel_trigger();
+ ink_assert(!in_progress);
+ if (event == VC_EVENT_ERROR || event == VC_EVENT_EOS) {
+ remote_closed = true;
+ CLUSTER_CACHE_VC_CLOSE_SESSION;
+ return calluser(event);
+ }
+
+ int64_t bytes = d_len;
+ int64_t ntodo = vio.ntodo();
+ if (ntodo <= 0)
+ return EVENT_CONT;
+ if (vio.buffer.writer()->max_read_avail() > vio.buffer.writer()->water_mark && vio.ndone) // initiate read of first block
+ return EVENT_CONT;
+ if (!blocks && vio.ntodo() > 0)
+ goto Lread;
+
+ if (bytes > vio.ntodo())
+ bytes = vio.ntodo();
+ vio.buffer.writer()->append_block(blocks);
+ vio.ndone += bytes;
+ blocks = NULL;
+ d_len -= bytes;
+
+ if (vio.ntodo() <= 0)
+ return calluser(VC_EVENT_READ_COMPLETE);
+ else {
+ if (calluser(VC_EVENT_READ_READY) == EVENT_DONE)
+ return EVENT_DONE;
+ // we have to keep reading until we give the user all the
+ // bytes it wanted or we hit the watermark.
+ if (vio.ntodo() > 0 && !vio.buffer.writer()->high_water())
+ goto Lread;
+ return EVENT_CONT;
+ }
+Lread:
+ if (vio.ndone >= (int64_t) doc_len) {
+ // reached the end of the document and the user still wants more
+ return calluser(VC_EVENT_EOS);
+ }
+ // if the state machine calls reenable on the callback from the cache,
+ // we set up a schedule_imm event. The openReadReadDone discards
+ // EVENT_IMMEDIATE events. So, we have to cancel that trigger and set
+ // a new EVENT_INTERVAL event.
+ cancel_trigger();
+ return handleRead(event, e);
+}
+
+int
+ClusterCacheVC::openWriteStart(int event, void *data)
+{
+ ink_assert(in_progress);
+ in_progress = false;
+ if (_action.cancelled) {
+ if (!remote_closed)
+ cluster_send_message(cs, CLUSTER_CACHE_DATA_ABORT, NULL, 0, PRIORITY_HIGH);
+ free_ClusterCacheVC(this);
+ return EVENT_DONE;
+ }
+ // process the data
+ if (event != CACHE_EVENT_OPEN_WRITE) {
+ // prevent further trigger
+ remote_closed = true;
+ CLUSTER_CACHE_VC_CLOSE_SESSION;
+ _action.continuation->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, data);
+ free_ClusterCacheVC(this);
+ return EVENT_DONE;
+ }
+ SET_HANDLER(&ClusterCacheVC::openWriteMain);
+ return callcont(CACHE_EVENT_OPEN_WRITE);
+}
+int
+ClusterCacheVC::openWriteMain(int , void *)
+{
+ cancel_trigger();
+ ink_assert(!in_progress);
+
+Lagain:
+ if (remote_closed) {
+ if (calluser(VC_EVENT_ERROR) == EVENT_DONE)
+ return EVENT_DONE;
+ return EVENT_CONT;
+ }
+
+ if (!vio.buffer.writer()) {
+ if (calluser(VC_EVENT_WRITE_READY) == EVENT_DONE)
+ return EVENT_DONE;
+ if (!vio.buffer.writer())
+ return EVENT_CONT;
+ }
+
+ int64_t ntodo = vio.ntodo();
+
+ if (ntodo <= 0) {
+ if (calluser(VC_EVENT_WRITE_COMPLETE) == EVENT_DONE)
+ return EVENT_DONE;
+ ink_assert(!"close expected after write COMPLETE");
+ if (vio.ntodo() <= 0)
+ return EVENT_CONT;
+ }
+
+ ntodo = vio.ntodo() + length;
+ int64_t total_avail = vio.buffer.reader()->read_avail();
+ int64_t avail = total_avail;
+ int64_t towrite = avail + length;
+ if (towrite > ntodo) {
+ avail -= (towrite - ntodo);
+ towrite = ntodo;
+ }
+
+ if (!blocks && towrite) {
+ blocks = vio.buffer.reader()->block;
+ offset = vio.buffer.reader()->start_offset;
+ }
+
+ if (avail > 0) {
+ vio.buffer.reader()->consume(avail);
+ vio.ndone += avail;
+ total_len += avail;
+ }
+
+ ink_assert(towrite >= 0);
+ length = towrite;
+
+ int flen = cache_config_target_fragment_size;
+
+ while (length >= flen) {
+ IOBufferBlock *r = clone_IOBufferBlockList(blocks, offset, flen);
+ blocks = iobufferblock_skip(blocks, &offset, &length, flen);
+
+ remote_closed = cluster_send_message(cs, CLUSTER_CACHE_DATA_WRITE_DONE, r, -1,
+ priority);
+ if (remote_closed)
+ goto Lagain;
+
+ data_sent += flen;
+ Debug("data_sent", "sent bytes %d, reminds %"PRId64"", flen, length);
+ }
+ // for the read_from_writer work better,
+ // especailly the slow original
+ flen = CLUSTER_WRITE_MIN_SIZE;
+ if (length >= flen || (vio.ntodo() <= 0 && length > 0)) {
+ data_sent += length;
+ IOBufferBlock *r = clone_IOBufferBlockList(blocks, offset, length);
+ blocks = iobufferblock_skip(blocks, &offset, &length, length);
+ remote_closed = cluster_send_message(cs, CLUSTER_CACHE_DATA_WRITE_DONE, r,
+ -1, priority);
+ if (remote_closed)
+ goto Lagain;
+ Debug("data_sent", "sent bytes %d, reminds %"PRId64"", flen, length);
+ }
+
+ if (vio.ntodo() <= 0) {
+ ink_assert(length == 0 && total_len == vio.nbytes);
+ return calluser(VC_EVENT_WRITE_COMPLETE);
+ }
+ return calluser(VC_EVENT_WRITE_READY);
+}
+
+int
+ClusterCacheVC::removeEvent(int event, void *data)
+{
+ ink_assert(in_progress);
+ in_progress = false;
+ remote_closed = true;
+ CLUSTER_CACHE_VC_CLOSE_SESSION;
+ if (!_action.cancelled)
+ _action.continuation->handleEvent(event, data);
+ free_ClusterCacheVC(this);
+ return EVENT_DONE;
+}
+
+VIO *
+ClusterCacheVC::do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *abuf)
+{
+ ink_assert(vio.op == VIO::READ && alternate.valid());
+ vio.buffer.writer_for(abuf);
+ vio.set_continuation(c);
+ vio.ndone = 0;
+ vio.nbytes = nbytes;
+ vio.vc_server = this;
+ seek_to = 0;
+ ink_assert(c->mutex->thread_holding);
+
+ ink_assert(!in_progress);
+ if (!trigger && !recursive)
+ trigger = c->mutex->thread_holding->schedule_imm_local(this);
+ return &vio;
+}
+
+VIO *
+ClusterCacheVC::do_io_pread(Continuation *c, int64_t nbytes, MIOBuffer *abuf, int64_t offset)
+{
+ ink_assert(vio.op == VIO::READ && alternate.valid());
+ vio.buffer.writer_for(abuf);
+ vio.set_continuation(c);
+ vio.ndone = 0;
+ vio.nbytes = nbytes;
+ vio.vc_server = this;
+ seek_to = offset;
+ ink_assert(c->mutex->thread_holding);
+
+ ink_assert(!in_progress);
+ if (!trigger && !recursive)
+ trigger = c->mutex->thread_holding->schedule_imm_local(this);
+ return &vio;
+}
+
+VIO *
+ClusterCacheVC::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *abuf, bool owner)
+{
+ ink_assert(vio.op == VIO::WRITE);
+ ink_assert(!owner && !in_progress);
+ vio.buffer.reader_for(abuf);
+ vio.set_continuation(c);
+ vio.ndone = 0;
+ vio.nbytes = nbytes;
+ doc_len = nbytes; // note: the doc_len maybe not the real length of the body
+ vio.vc_server = this;
+ ink_assert(c->mutex->thread_holding);
+
+ if (nbytes < (1 << 20))
+ priority = PRIORITY_MID;
+ else
+ priority = PRIORITY_LOW;
+
+ CacheHTTPInfo *r = &alternate;
+ SetIOWriteMessage msg;
+ msg.nbytes = nbytes;
+ int len = r->valid() ? r->marshal_length() : 0;
+ msg.hdr_len = len;
+ ink_assert(total_len == 0);
+ ink_assert((frag_type == CACHE_FRAG_TYPE_HTTP && len > 0) ||
+ (frag_type != CACHE_FRAG_TYPE_HTTP && len == 0));
+
+ if (len > 0) {
+ Ptr<IOBufferData> data;
+ data = new_IOBufferData(iobuffer_size_to_index(sizeof msg + len, MAX_BUFFER_SIZE_INDEX));
+ memcpy((char *) data->data(), &msg, sizeof(msg));
+ char *p = (char *) data->data() + sizeof msg;
+ int res = r->marshal(p, len);
+ ink_assert(res >= 0);
+ IOBufferBlock *ret = new_IOBufferBlock(data, sizeof msg + len, 0);
+ ret->_buf_end = ret->_end;
+ remote_closed = cluster_send_message(cs, CLUSTER_CACHE_DATA_WRITE_BEGIN, ret, -1, priority);
+ } else
+ remote_closed = cluster_send_message(cs, -CLUSTER_CACHE_DATA_WRITE_BEGIN, &msg, sizeof msg, priority);
+
+ if (!trigger && !recursive)
+ trigger = c->mutex->thread_holding->schedule_imm_local(this);
+ return &vio;
+}
+
+void
+ClusterCacheVC::do_io_close(int alerrno)
+{
+ ink_assert(mutex->thread_holding == this_ethread());
+ int previous_closed = closed;
+ closed = (alerrno == -1) ? 1 : -1; // Stupid default arguments
+ DDebug("cache_close", "do_io_close %p %d %d", this, alerrno, closed);
+
+ // special case: to cache 0 bytes document
+ if (f.force_empty)
+ closed = 1;
+
+ if (!remote_closed) {
+ if (closed > 0 && vio.op == VIO::WRITE) {
+ if ((f.update && vio.nbytes == 0) || f.force_empty) {
+ //header only update
+ //
+ if (frag_type == CACHE_FRAG_TYPE_HTTP) {
+ if (alternate.valid()) {
+ SetIOCloseMessage msg;
+ msg.h_len = alternate.marshal_length();
+ msg.d_len = 0;
+ msg.total_len = 0;
+
+ Ptr<IOBufferData> d;
+ d = new_IOBufferData(iobuffer_size_to_index(sizeof msg + msg.h_len));
+ char *data = d->data();
+ memcpy(data, &msg, sizeof msg);
+
+ int res = alternate.marshal((char *) data + sizeof msg, msg.h_len);
+ ink_assert(res >= 0 && res <= msg.h_len);
+
+ IOBufferBlock *ret = new_IOBufferBlock(d, sizeof msg + msg.h_len, 0);
+ ret->_buf_end = ret->_end;
+
+ remote_closed = cluster_send_message(cs,
+ -CLUSTER_CACHE_HEADER_ONLY_UPDATE, ret, -1, PRIORITY_HIGH);
+ } else
+ remote_closed = cluster_send_message(cs, -CLUSTER_CACHE_DATA_CLOSE, &total_len,
+ sizeof total_len, PRIORITY_HIGH);
+ } else {
+ remote_closed = cluster_send_message(cs, -CLUSTER_CACHE_DATA_CLOSE,
+ &total_len, sizeof total_len, priority);
+ }
+
+ goto Lfree;
+ } else if ((total_len < vio.nbytes) || length > 0) {
+ int64_t ntodo = vio.ntodo() + length;
+ int64_t total_avail = vio.buffer.reader()->read_avail();
+ int64_t avail = total_avail;
+ int64_t towrite = avail + length;
+ if (towrite > ntodo) {
+ avail -= (towrite - ntodo);
+ towrite = ntodo;
+ }
+
+ if (!blocks && towrite) {
+ blocks = vio.buffer.reader()->block;
+ offset = vio.buffer.reader()->start_offset;
+ }
+
+ if (avail > 0) {
+ vio.buffer.reader()->consume(avail);
+ vio.ndone += avail;
+ total_len += avail;
+ }
+
+ if (vio.ntodo() > 0) {
+ Warning("writer closed success but still want more data");
+ remote_closed = cluster_send_message(cs, CLUSTER_CACHE_DATA_ABORT, NULL, 0,
+ priority);
+ goto Lfree;
+ }
+
+ length = towrite;
+ ink_assert(total_len == vio.nbytes);
+ int flen = cache_config_target_fragment_size;
+ while (length >= flen) {
+ IOBufferBlock *ret = clone_IOBufferBlockList(blocks, offset, flen);
+ blocks = iobufferblock_skip(blocks, &offset, &length, flen);
+
+ remote_closed = cluster_send_message(cs, CLUSTER_CACHE_DATA_WRITE_DONE, ret,
+ -1, priority);
+ if (remote_closed)
+ goto Lfree;
+
+ data_sent += flen;
+ Debug("data_sent", "sent bytes %d, reminds %"PRId64"", flen, length);
+ }
+
+ if (length > 0) {
+ data_sent += length;
+ IOBufferBlock *ret = clone_IOBufferBlockList(blocks, offset, length);
+ blocks = iobufferblock_skip(blocks, &offset, &length, length);
+ remote_closed = cluster_send_message(cs, CLUSTER_CACHE_DATA_WRITE_DONE, ret, -1,
+ priority);
+ if (remote_closed)
+ goto Lfree;
+ Debug("data_sent", "sent bytes done: %"PRId64", reminds %"PRId64"", data_sent, length);
+ }
+ }
+
+ if (doc_len != vio.nbytes) {
+ // for trunk
+ ink_assert(total_len == vio.nbytes && length == 0);
+ remote_closed = cluster_send_message(cs, CLUSTER_CACHE_DATA_CLOSE,
+ &total_len, sizeof total_len, priority);
+ goto Lfree;
+ }
+ ink_assert(data_sent == total_len);
+ }
+
+ if (closed < 0 && vio.op == VIO::WRITE)
+ remote_closed = cluster_send_message(cs, CLUSTER_CACHE_DATA_ABORT, NULL, 0, PRIORITY_HIGH);
+
+ if (vio.op == VIO::READ && !in_progress) {
+ remote_closed = cluster_send_message(cs, CLUSTER_CACHE_DATA_ABORT, NULL, 0, PRIORITY_HIGH);
+ }
+ }
+Lfree:
+ if (!previous_closed && !recursive && !in_progress) {
+ free_ClusterCacheVC(this);
+ }
+}
+
+void
+ClusterCacheVC::reenable(VIO *avio)
+{
+ DDebug("cache_reenable", "reenable %p, trigger %p, in_progress %d", this, trigger, in_progress);
+ (void) avio;
+ ink_assert(avio->mutex->thread_holding);
+ if (!trigger && !in_progress) {
+ trigger = avio->mutex->thread_holding->schedule_imm_local(this);
+ }
+}
+
+void
+ClusterCacheVC::reenable_re(VIO *avio)
+{
+ DDebug("cache_reenable", "reenable %p", this);
+ (void) avio;
+ ink_assert(avio->mutex->thread_holding);
+
+ if (!trigger) {
+ if (!in_progress && !recursive) {
+ handleEvent(EVENT_NONE, (void *) 0);
+ } else if (!in_progress)
+ trigger = avio->mutex->thread_holding->schedule_imm_local(this);
+ }
+}
// End of ClusterVConnection.cc
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/62504a9f/iocore/cluster/EventPoll.cc
----------------------------------------------------------------------
diff --git a/iocore/cluster/EventPoll.cc b/iocore/cluster/EventPoll.cc
new file mode 100644
index 0000000..8a6b9e6
--- /dev/null
+++ b/iocore/cluster/EventPoll.cc
@@ -0,0 +1,158 @@
+/** @file
+
+ A brief file description
+
+ @section license License
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+#include "EventPoll.h"
+
+EventPoll::EventPoll(const int size, int timeout) : _size(size)
+{
+ int bytes;
+
+#if TS_USE_EPOLL
+ _extra_events = EPOLLET;
+ _timeout = timeout;
+ _poll_fd = epoll_create(_size);
+ bytes = sizeof(struct epoll_event) * size;
+ _events = (struct epoll_event *)ats_malloc(bytes);
+#elif TS_USE_KQUEUE
+ _extra_events = INK_EV_EDGE_TRIGGER;
+ _timeout.tv_sec = timeout / 1000;
+ _timeout.tv_nsec = 1000000 * (timeout % 1000);
+ _poll_fd = kqueue();
+ bytes = sizeof(struct kevent) * size;
+ _events = (struct kevent *)ats_malloc(bytes);
+#elif TS_USE_PORT
+ _extra_events = 0;
+ _timeout.tv_sec = timeout / 1000;
+ _timeout.tv_nsec = 1000000 * (timeout % 1000);
+ _poll_fd = port_create();
+ bytes = sizeof(port_event_t) * size;
+ _events = (port_event_t *)ats_malloc(bytes);
+#endif
+}
+
+EventPoll::~EventPoll()
+{
+ ats_free(_events);
+ close(_poll_fd);
+}
+
+int EventPoll::attach(const int fd, const int e, void *data)
+{
+#if TS_USE_EPOLL
+ struct epoll_event ev;
+ memset(&ev, 0, sizeof(ev));
+ ev.events = e | _extra_events;
+ ev.data.ptr = data;
+ return epoll_ctl(_poll_fd, EPOLL_CTL_ADD, fd, &ev);
+#elif TS_USE_KQUEUE
+ struct kevent ev[2];
+ int n = 0;
+ if (e & EVENTIO_READ) {
+ EV_SET(&ev[n++], fd, EVFILT_READ, EV_ADD | _extra_events, 0, 0, data);
+ }
+ if (e & EVENTIO_WRITE) {
+ EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_ADD | _extra_events, 0, 0, data);
+ }
+ return kevent(_poll_fd, ev, n, NULL, 0, NULL);
+#elif TS_USE_PORT
+ return port_associate(_poll_fd, PORT_SOURCE_FD, fd, e, data);
+#endif
+}
+
+int EventPoll::modify(const int fd, const int e, void *data)
+{
+#if TS_USE_EPOLL
+ struct epoll_event ev;
+ memset(&ev, 0, sizeof(ev));
+ ev.events = e | _extra_events;
+ ev.data.ptr = data;
+ return epoll_ctl(_poll_fd, EPOLL_CTL_MOD, fd, &ev);
+#elif TS_USE_KQUEUE
+ struct kevent ev[2];
+ int n = 0;
+ if (e & EVENTIO_READ) {
+ EV_SET(&ev[n++], fd, EVFILT_READ, EV_ADD | _extra_events, 0, 0, data);
+ }
+ else {
+ EV_SET(&ev[n++], fd, EVFILT_READ, EV_DELETE, 0, 0, data);
+ }
+
+ if (e & EVENTIO_WRITE) {
+ EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_ADD | _extra_events, 0, 0, data);
+ }
+ else {
+ EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, data);
+ }
+ return kevent(_poll_fd, ev, n, NULL, 0, NULL);
+#elif TS_USE_PORT
+ return port_associate(_poll_fd, PORT_SOURCE_FD, fd, e, data);
+#endif
+}
+
+int EventPoll::detach(const int fd)
+{
+#if TS_USE_EPOLL
+ return epoll_ctl(_poll_fd, EPOLL_CTL_DEL, fd, NULL);
+#elif TS_USE_PORT
+ return port_dissociate(_poll_fd, PORT_SOURCE_FD, fd);
+#else
+ return 0;
+#endif
+}
+
+int EventPoll::poll()
+{
+#if TS_USE_EPOLL
+ return epoll_wait(_poll_fd, _events, _size, _timeout);
+#elif TS_USE_KQUEUE
+ return kevent(_poll_fd, NULL, 0, _events, _size, &_timeout);
+#elif TS_USE_PORT
+ int retval;
+ unsigned nget = 1;
+ if((retval = port_getn(_poll_fd, _events,
+ _size, &nget, &_timeout)) == 0)
+ {
+ result = (int)nget;
+ } else {
+ switch(errno) {
+ case EINTR:
+ case EAGAIN:
+ case ETIME:
+ if (nget > 0) {
+ result = (int)nget;
+ }
+ else {
+ result = 0;
+ }
+ break;
+ default:
+ result = -1;
+ break;
+ }
+ }
+ return result;
+#else
+#error port me
+#endif
+}
+
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/62504a9f/iocore/cluster/EventPoll.h
----------------------------------------------------------------------
diff --git a/iocore/cluster/EventPoll.h b/iocore/cluster/EventPoll.h
new file mode 100644
index 0000000..ccb65de
--- /dev/null
+++ b/iocore/cluster/EventPoll.h
@@ -0,0 +1,105 @@
+/** @file
+
+ A brief file description
+
+ @section license License
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+#ifndef __EVENT_POLL_H__
+#define __EVENT_POLL_H__
+
+#include "P_Net.h"
+
+class EventPoll {
+ public:
+ EventPoll(const int size, int timeout);
+ ~EventPoll();
+ int attach(const int fd, const int e, void *data);
+ int modify(const int fd, const int e, void *data);
+ int detach(const int fd);
+ int poll();
+
+#if TS_USE_KQUEUE
+ /* we define these here as numbers, because for kqueue mapping them to a combination of
+ * filters / flags is hard to do. */
+ inline int kq_event_convert(int16_t event, uint16_t flags)
+ {
+ int r = 0;
+
+ if (event == EVFILT_READ) {
+ r |= INK_EVP_IN;
+ }
+ else if (event == EVFILT_WRITE) {
+ r |= INK_EVP_OUT;
+ }
+
+ if (flags & EV_EOF) {
+ r |= INK_EVP_HUP;
+ }
+ return r;
+ }
+#endif
+
+ inline int getEvents(const int index)
+ {
+#if TS_USE_EPOLL
+ return _events[index].events;
+#elif TS_USE_KQUEUE
+ /* we define these here as numbers, because for kqueue mapping them to a combination of
+ * filters / flags is hard to do. */
+ return kq_event_convert(_events[index].filter, _events[index].flags);
+#elif TS_USE_PORT
+ return _events[index].portev_events;
+#else
+#error port me
+#endif
+ }
+
+ inline void *getData(const int index)
+ {
+#if TS_USE_EPOLL
+ return _events[index].data.ptr;
+#elif TS_USE_KQUEUE
+ return _events[index].udata;
+#elif TS_USE_PORT
+ return _events[index].portev_user;
+#else
+#error port me
+#endif
+ }
+
+ protected:
+ int _size; //max events (fd)
+ int _extra_events;
+ int _poll_fd;
+
+#if TS_USE_EPOLL
+ struct epoll_event *_events;
+ int _timeout;
+#elif TS_USE_KQUEUE
+ struct kevent *_events;
+ struct timespec _timeout;
+#elif TS_USE_PORT
+ port_event_t *_events;
+ timespec_t _timeout;
+#endif
+};
+
+#endif
+
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/62504a9f/iocore/cluster/Makefile.am
----------------------------------------------------------------------
diff --git a/iocore/cluster/Makefile.am b/iocore/cluster/Makefile.am
index 1d8266d..b49046d 100644
--- a/iocore/cluster/Makefile.am
+++ b/iocore/cluster/Makefile.am
@@ -57,5 +57,12 @@ libinkcluster_a_SOURCES = \
P_ClusterLoadMonitor.h \
P_ClusterMachine.h \
P_TimeTrace.h \
- Inline.cc
+ Inline.cc \
+ global.cc \
+ nio.cc \
+ session.cc \
+ message.cc \
+ connection.cc \
+ machine.cc \
+ EventPoll.cc
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/62504a9f/iocore/cluster/P_Cluster.h
----------------------------------------------------------------------
diff --git a/iocore/cluster/P_Cluster.h b/iocore/cluster/P_Cluster.h
index aa3d6a5..f24d3de 100644
--- a/iocore/cluster/P_Cluster.h
+++ b/iocore/cluster/P_Cluster.h
@@ -123,6 +123,8 @@ enum
cluster_stat_count
};
+#define SIZE_OF_FRAGEMENT ((1 << 20) - 128)
+
extern RecRawStatBlock *cluster_rsb;
#define CLUSTER_INCREMENT_DYN_STAT(x) \
RecIncrRawStat(cluster_rsb, mutex->thread_holding, (int) x, 1);
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/62504a9f/iocore/cluster/P_ClusterCache.h
----------------------------------------------------------------------
diff --git a/iocore/cluster/P_ClusterCache.h b/iocore/cluster/P_ClusterCache.h
index 10fc46a..162fef0 100644
--- a/iocore/cluster/P_ClusterCache.h
+++ b/iocore/cluster/P_ClusterCache.h
@@ -53,6 +53,9 @@
/****************************************************************************/
#include "P_ClusterMachine.h"
+#include "clusterinterface.h"
+
+extern int enable_cache_empty_http_doc;
//
// Cluster Processor
@@ -310,6 +313,7 @@ struct ClusterVCToken
//
typedef void ClusterFunction(ClusterHandler * ch, void *data, int len);
typedef ClusterFunction *ClusterFunctionPtr;
+typedef void ClusterFunctionExt(ClusterSession cs, void *context, void *data);
struct ClusterVConnectionBase;
@@ -512,7 +516,9 @@ struct ClusterVConnection: public ClusterVConnectionBase
ClusterVConnection(int is_new_connect_read = 0);
~ClusterVConnection();
void free(); // Destructor actions (we are using ClassAllocator)
-
+ virtual bool is_read_from_writer() {
+ return false;
+ }
virtual void do_io_close(int lerrno = -1);
virtual VIO *do_io_read(Continuation * c, int64_t nbytes, MIOBuffer * buf);
virtual VIO *do_io_write(Continuation * c, int64_t nbytes, IOBufferReader * buf, bool owner = false);
@@ -739,9 +745,9 @@ extern ClusterFunction close_channel_ClusterFunction;
extern ClusterFunction get_hostinfo_ClusterFunction;
extern ClusterFunction put_hostinfo_ClusterFunction;
extern ClusterFunction cache_lookup_ClusterFunction;
-extern ClusterFunction cache_op_ClusterFunction;
+//extern ClusterFunction cache_op_ClusterFunction;
extern ClusterFunction cache_op_malloc_ClusterFunction;
-extern ClusterFunction cache_op_result_ClusterFunction;
+//extern ClusterFunction cache_op_result_ClusterFunction;
extern ClusterFunction set_channel_data_ClusterFunction;
extern ClusterFunction post_setchan_send_ClusterFunction;
extern ClusterFunction set_channel_pin_ClusterFunction;
@@ -750,6 +756,9 @@ extern ClusterFunction set_channel_priority_ClusterFunction;
extern ClusterFunction post_setchan_priority_ClusterFunction;
extern ClusterFunction default_api_ClusterFunction;
+extern ClusterFunctionExt cache_op_ClusterFunction;
+extern ClusterFunctionExt cache_op_result_ClusterFunction;
+
struct ClusterFunctionDescriptor
{
bool fMalloced; // the function will free the data
@@ -767,7 +776,8 @@ struct ClusterFunctionDescriptor
#ifndef DEFINE_CLUSTER_FUNCTIONS
extern
#endif
-ClusterFunctionDescriptor clusterFunction[]
+ClusterFunctionDescriptor clusterFunction[1]
+#if 0
#ifdef DEFINE_CLUSTER_FUNCTIONS
= {
{false, true, CMSG_LOW_PRI, test_ClusterFunction, 0},
@@ -863,7 +873,7 @@ ClusterFunctionDescriptor clusterFunction[]
// ********** ADD NEW ENTRIES ABOVE THIS LINE ************
}
#endif
-
+#endif
;
extern unsigned SIZE_clusterFunction; // clusterFunction[] entries
@@ -983,10 +993,27 @@ ClusterFuncToQpri(int cluster_func)
#define API_F29_CLUSTER_FUNCTION 79
#define API_F30_CLUSTER_FUNCTION 80
-#define API_STARECT_CLUSTER_FUNCTION API_F01_CLUSTER_FUNCTION
-#define API_END_CLUSTER_FUNCTION API_F30_CLUSTER_FUNCTION
+#define CLUSTER_CACHE_OP_CLUSTER_FUNCTION (CLUSTER_MSG_START+81)
+#define CLUSTER_CACHE_DATA_READ_BEGIN (CLUSTER_MSG_START+82)
+#define CLUSTER_CACHE_DATA_READ_REENABLE (CLUSTER_MSG_START+83)
+#define CLUSTER_CACHE_DATA_WRITE_BEGIN (CLUSTER_MSG_START+84)
+#define CLUSTER_CACHE_HEADER_ONLY_UPDATE (CLUSTER_MSG_START+85)
+#define CLUSTER_CACHE_DATA_CLOSE (CLUSTER_MSG_START+86)
+#define CLUSTER_CACHE_DATA_ABORT (CLUSTER_MSG_START+87)
+#define CLUSTER_CACHE_DATA_WRITE_DONE (CLUSTER_MSG_START+88)
+
+#define CLUSTER_CACHE_OP_RESULT_CLUSTER_FUNCTION (CLUSTER_MSG_START+89)
+#define CLUSTER_CACHE_DATA_READ_DONE (CLUSTER_MSG_START+90)
+#define CLUSTER_CACHE_DATA_ERROR (CLUSTER_MSG_START+91)
-#define UNDEFINED_CLUSTER_FUNCTION 0xFDEFFDEF
+#define CLUSTER_INTERNEL_ERROR (CLUSTER_MSG_START+100)
+#define CLUSTER_PING_CLUSTER_FUNCTION (CLUSTER_MSG_START+101) 1
+#define CLUSTER_PING_REPLY_CLUSTER_FUNCTION (CLUSTER_MSG_START+102)
+
+#define API_STARECT_CLUSTER_FUNCTION API_F01_CLUSTER_FUNCTION
+#define API_END_CLUSTER_FUNCTION API_F30_CLUSTER_FUNCTION
+
+#define UNDEFINED_CLUSTER_FUNCTION 0xFDEFFDEF
//////////////////////////////////////////////
// Initial cluster connect exchange message
@@ -1171,4 +1198,328 @@ ClusterVC_remove_write(ClusterVConnectionBase * vc)
}
+struct ClusterCacheVC: public CacheVConnection
+{
+ static int size_to_init;
+ Action _action;
+ Ptr<IOBufferData> buf; // for read
+ Ptr<IOBufferData> first_buf; // the head fragment
+ Ptr<IOBufferBlock> blocks; // data available to write
+
+ CacheHTTPInfo alternate;
+
+ VIO vio;
+ ink_hrtime start_time;
+ CacheFragType frag_type;
+ int64_t seek_to; // pread offset
+ int64_t offset; // offset into 'blocks' of data to write
+ int64_t length; // length of data available to write
+ int64_t total_len;
+ int64_t data_sent;
+ int64_t doc_len;
+
+ int doc_pos; // read position in 'buf'
+ int d_len; // the length of data in 'buf'
+
+ int closed;
+ int recursive;
+ int disk_io_priority;
+ int probe_depth;
+ MessagePriority priority;
+
+ time_t time_pin;
+ EThread *initial_thread; // initial thread open_XX was called on
+ ClusterSession cs;
+ Event *trigger;
+ ContinuationHandler save_handler;
+
+
+ bool in_progress; //
+ bool remote_closed;
+ bool session_closed;
+
+ union
+ {
+ uint32_t flags;
+ struct
+ {
+ unsigned int use_first_key:1;
+ unsigned int overwrite:1; // overwrite first_key Dir if it exists
+ unsigned int close_complete:1; // WRITE_COMPLETE is final
+ unsigned int sync:1; // write to be committed to durable storage before WRITE_COMPLETE
+ unsigned int evacuator:1;
+ unsigned int single_fragment:1;
+ unsigned int evac_vector:1;
+ unsigned int lookup:1;
+ unsigned int update:1;
+ unsigned int remove:1;
+ unsigned int remove_aborted_writers:1;
+ unsigned int open_read_timeout:1; // UNUSED
+ unsigned int data_done:1;
+ unsigned int read_from_writer_called:1;
+ unsigned int not_from_ram_cache:1; // entire object was from ram cache
+ unsigned int rewrite_resident_alt:1;
+ unsigned int readers:1;
+ unsigned int doc_from_ram_cache:1;
+#ifdef HIT_EVACUATE
+ unsigned int hit_evacuate:1;
+#endif
+#ifdef HTTP_CACHE
+ unsigned int force_empty:1; // used for cache empty http document
+#endif
+#ifdef SSD_CACHE
+ unsigned int read_from_ssd:1;
+ unsigned int write_into_ssd:1;
+ unsigned int ram_fixup:1;
+ unsigned int transistor:1;
+#endif
+ } f;
+ };
+ ClusterCacheVC();
+ VIO *do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf); // invoke remote
+ VIO *do_io_pread(Continuation *c, int64_t nbytes, MIOBuffer *buf, int64_t offset); // invoke remote
+ VIO *do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *buf, bool owner = false); // invoke remote
+ void do_io_close(int lerrno = -1); // invoke remote ?
+ void reenable(VIO *avio); // invoke remote ?
+ void reenable_re(VIO *avio); // invoke remote ?
+
+ void do_remote_close(); // invoke remote, for cancel or error
+
+ virtual int get_header(void **, int *)
+ {
+ ink_assert(!"implemented");
+ return -1;
+ }
+ virtual int set_header(void *, int)
+ {
+ ink_assert(!"implemented");
+ return -1;
+ }
+ virtual int get_single_data(void **, int *)
+ {
+ ink_assert(!"implemented");
+ return -1;
+ }
+
+#ifdef HTTP_CACHE
+ virtual void set_http_info(CacheHTTPInfo *info) {
+ if (enable_cache_empty_http_doc) {
+ MIMEField *field = info->m_alt->m_response_hdr.field_find(
+ MIME_FIELD_CONTENT_LENGTH, MIME_LEN_CONTENT_LENGTH);
+ if (field && !field->value_get_int64())
+ f.force_empty = 1;
+ else
+ f.force_empty = 0;
+ } else
+ f.force_empty = 0;
+ alternate.copy_shallow(info);
+ info->clear();
+ }
+ virtual void get_http_info(CacheHTTPInfo ** info) {
+ *info = &alternate;
+ }
+#endif
+
+ bool is_ram_cache_hit() {
+ ink_assert(vio.op == VIO::READ);
+ return !f.not_from_ram_cache;
+ }
+ virtual bool set_disk_io_priority(int priority)
+ {
+ disk_io_priority = priority;
+ return true;
+ }
+ virtual int get_disk_io_priority() {
+ return disk_io_priority;
+ }
+ virtual bool set_pin_in_cache(time_t t) {
+ time_pin = t;
+ return true;
+ }
+ virtual time_t get_pin_in_cache() {
+ return time_pin;
+ }
+ virtual int64_t get_object_size()
+ {
+ return alternate.object_size_get();
+ }
+ virtual bool is_read_from_writer()
+ {
+ return f.read_from_writer_called;
+ }
+ virtual bool is_ram_cache_hit() const
+ {
+ return !f.not_from_ram_cache;
+ }
+ virtual bool is_pread_capable()
+ {
+ return true;
+ }
+ void
+ cancel_trigger()
+ {
+ if (trigger) {
+ trigger->cancel_action();
+ trigger = NULL;
+ }
+ }
+
+ int calluser(int event);
+ int callcont(int event);
+ int handleRead(int event, void *data);
+ int openReadReadDone(int event, void *data);
+// int handleWrite(int event, void *data);
+// int openWriteWriteDone(int event, void *data);
+ int openReadStart(int event, void *data);
+ int openWriteStart(int event, void *data);
+ int openReadMain(int event, void *data);
+ int openWriteMain(int event, void *data);
+ int removeEvent(int event, void *data);
+};
+
+
+
+struct SetIOReadMessage: public ClusterMessageHeader
+{
+ int64_t nbytes;
+ int64_t offset;
+};
+
+struct SetIOWriteMessage: public ClusterMessageHeader
+{
+ int32_t hdr_len;
+ int64_t nbytes;
+};
+
+struct SetIOCloseMessage: public ClusterMessageHeader
+{
+ int h_len;
+ int d_len;
+ int64_t total_len;
+};
+
+struct SetIOReenableMessage: public ClusterMessageHeader
+{
+ int reenable;
+};
+struct SetResponseMessage: public ClusterMessageHeader
+{
+
+};
+
+inline IOBufferBlock *
+clone_IOBufferBlockList(IOBufferBlock *ab, int64_t offset, int64_t len)
+{
+ IOBufferBlock *b = ab;
+ IOBufferBlock *head = NULL;
+ IOBufferBlock *clone = NULL;
+
+ while (b && len >= 0) {
+ int64_t max_bytes = b->read_avail();
+ max_bytes -= offset;
+ if (max_bytes <= 0) {
+ offset = -max_bytes;
+ b = b->next;
+ continue;
+ }
+
+ if (!head) {
+ head = b->clone();
+ head->consume(offset);
+ clone = head;
+ } else {
+ clone->next = b->clone();
+ clone = clone->next;
+ }
+
+ len -= max_bytes;
+ b = b->next;
+ offset = 0;
+ }
+ if (clone && len < 0)
+ clone->fill(len);
+ return head;
+}
+
+ClusterCacheVC *new_ClusterCacheVC();
+void free_ClusterCacheVC(ClusterCacheVC *ccvc);
+
+inline int
+ClusterCacheVC::calluser(int event)
+{
+ recursive++;
+ ink_assert(this_ethread() == vio._cont->mutex->thread_holding);
+ vio._cont->handleEvent(event, (void *) &vio);
+ recursive--;
+ if (closed && !in_progress) {
+ free_ClusterCacheVC(this);
+ return EVENT_DONE;
+ }
+ return EVENT_CONT;
+}
+
+inline int
+ClusterCacheVC::callcont(int event)
+{
+ recursive++;
+ ink_assert(this_ethread() == _action.mutex->thread_holding);
+ _action.continuation->handleEvent(event, this);
+ recursive--;
+ if (closed && !in_progress) {
+ free_ClusterCacheVC(this);
+ return EVENT_DONE;
+ } else if (vio.vc_server)
+ handleEvent(EVENT_IMMEDIATE, 0);
+ return EVENT_DONE;
+}
+
+extern ClassAllocator<ClusterCacheVC> clusterCacheVCAllocator;
+
+inline ClusterCacheVC *
+new_ClusterCacheVC(Continuation *cont)
+{
+ EThread *t = cont->mutex->thread_holding;
+ ClusterCacheVC *c = clusterCacheVCAllocator.alloc();
+ c->_action = cont;
+ c->initial_thread = t;
+ c->mutex = cont->mutex;
+ c->start_time = ink_get_hrtime();
+ ink_assert(c->trigger == NULL);
+
+ Debug("cluster_cache_new", "new %p", c);
+ return c;
+}
+
+inline void
+free_ClusterCacheVC(ClusterCacheVC *cont)
+{
+ Debug("cluster_cache_free", "free %p", cont);
+ ink_assert(cont->mutex->thread_holding == this_ethread());
+
+ if (cont->trigger)
+ cont->trigger->cancel();
+ ink_assert(!cont->in_progress);
+
+ if (!cont->session_closed)
+ cluster_close_session(cont->cs);
+
+ cont->vio.buffer.clear();
+ cont->vio.mutex.clear();
+#ifdef HTTP_CACHE
+ if (cont->vio.op == VIO::WRITE)
+ cont->alternate.destroy();
+ else
+ cont->alternate.clear();
+#endif
+ cont->_action.cancelled = 0;
+ cont->_action.mutex.clear();
+ cont->mutex.clear();
+ cont->buf.clear();
+ cont->first_buf.clear();
+ cont->blocks.clear();
+
+ memset((char *) &cont->vio, 0, cont->size_to_init);
+
+ clusterCacheVCAllocator.free(cont);
+}
#endif /* _Cluster_h */
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/62504a9f/iocore/cluster/P_ClusterCacheInternal.h
----------------------------------------------------------------------
diff --git a/iocore/cluster/P_ClusterCacheInternal.h b/iocore/cluster/P_ClusterCacheInternal.h
index 8b62d44..44b675c 100644
--- a/iocore/cluster/P_ClusterCacheInternal.h
+++ b/iocore/cluster/P_ClusterCacheInternal.h
@@ -81,6 +81,88 @@ extern int ET_CLUSTER;
#define PROBE_LOCAL_CACHE_FIRST DO_REPLICATION
#define PROBE_LOCAL_CACHE_LAST false
+struct ClusterCont: public Continuation
+{
+ ClusterSession session;
+ Ptr<IOBufferBlock> data;
+ void *context;
+ int func_id;
+ int data_len;
+
+ Action _action;
+ int handleEvent(int event, void *d);
+ IOBufferData *copy_data();
+ int copy_data(char *buf, int size);
+ void consume(int size);
+};
+
+inline IOBufferData *
+ClusterCont::copy_data() {
+ IOBufferData *buf = new_IOBufferData(iobuffer_size_to_index(data_len, MAX_BUFFER_SIZE_INDEX));
+ char *p = buf->data();
+ for (IOBufferBlock *b = data; b; b = b->next) {
+ memcpy(p, b->_start, b->_end - b->_start);
+ p += b->_end - b->_start;
+ }
+ return buf;
+}
+
+inline void
+ClusterCont::consume(int size) {
+
+ int64_t sz = size;
+ while (data && sz >= data->read_avail()) {
+ sz -= data->read_avail();
+ data = data->next;
+ }
+ if (data)
+ data->_start += sz;
+
+ data_len = data_len > size ? (data_len - size) : 0;
+}
+
+inline int
+ClusterCont::copy_data(char *buf, int len)
+{
+ ink_assert(data_len >= len);
+ IOBufferBlock *b = data;
+ int64_t sz = len;
+ while (len > 0 && b) {
+ int64_t avail = b->read_avail();
+ sz -= avail;
+ if (sz < 0) {
+ memcpy(buf, b->_start, avail + sz);
+ sz = 0;
+ break;
+ } else {
+ memcpy(buf, b->_start, avail);
+ buf += avail;
+ b = b->next;
+ }
+ }
+ return len - (int) sz;
+}
+extern ClassAllocator<ClusterCont> clusterContAllocator;
+
+inline int
+ClusterCont::handleEvent(int, void *) {
+ if (func_id == CLUSTER_CACHE_OP_CLUSTER_FUNCTION)
+ cache_op_ClusterFunction(session, context, this);
+ else if (func_id == CLUSTER_CACHE_OP_RESULT_CLUSTER_FUNCTION)
+ cache_op_result_ClusterFunction(session, context, this);
+ else if (func_id == CLUSTER_INTERNEL_ERROR)
+ _action.continuation->handleEvent(func_id, NULL);
+ else
+ _action.continuation->handleEvent(func_id, this);
+
+ mutex.clear();
+ _action.mutex.clear();
+ data = NULL;
+
+ clusterContAllocator.free(this);
+ return EVENT_DONE;
+}
+
//
// This continuation handles all cache cluster traffic, on both
// sides (state machine client and cache server)
@@ -89,111 +171,91 @@ struct CacheContinuation;
typedef int (CacheContinuation::*CacheContHandler) (int, void *);
struct CacheContinuation:public Continuation
{
+ static int size_to_init;
enum
{
MagicNo = 0x92183123
};
int magicno;
- void *callback_data;
- void *callback_data_2;
INK_MD5 url_md5;
- Event *timeout;
- Action action;
- ClusterMachine *target_machine;
- int probe_depth;
+
ClusterMachine *past_probes[CONFIGURATION_HISTORY_PROBE_DEPTH];
+
+ ClusterVCToken token;
+
+ CacheHTTPInfo cache_vc_info; // for get_http_info
+// MIOBuffer doc_data;
+ Ptr<IOBufferBlock> doc_data;
+ // Incoming data generated from unmarshaling request/response ops
+ Ptr<IOBufferData> rw_buf_msg;
+ Arena ic_arena;
+ CacheHTTPHdr ic_request; // for lookup or read
+ CacheHTTPInfo ic_old_info; // for update
+ CacheHTTPInfo ic_new_info; // for set_http_info
+
+ ClusterSession cs;
+ char *ic_hostname;
+ int ic_hostname_len;
+
ink_hrtime start_time;
- ClusterMachine *from;
- ClusterHandler *ch;
- VConnection *cache_vc;
+ ClusterMachine *target_machine;
+ int probe_depth;
+
+ CacheVC *cache_vc;
+ Action *pending_action;
bool cache_read;
+ bool request_purge;
+ bool have_all_data; // all object data in response
+ bool expect_next;
+ bool writer_aborted;
int result; // return event code
int result_error; // error code associated with event
- ClusterVCToken token;
- unsigned int seq_number;
uint16_t cfl_flags; // Request flags; see CFL_XXX defines
+
+ unsigned int seq_number;
CacheFragType frag_type;
- int nbytes;
+ int nbytes; // the msg nbyts
unsigned int target_ip;
int request_opcode;
- bool request_purge;
- bool local_lookup_only;
- bool no_reply_message;
- bool request_timeout; // timeout occurred before
- // op complete
- bool expect_cache_callback;
-
- // remove_and_delete() specific data
- bool use_deferred_callback;
-
- // open_read/write data
-
- time_t pin_in_cache;
-
- // setMsgBufferLen(), allocMsgBuffer() and freeMsgBuffer() data
-
- Ptr<IOBufferData> rw_buf_msg;
+ int header_len;
int rw_buf_msg_len;
- // open data
-
- ClusterVConnection *read_cluster_vc;
- ClusterVConnection *write_cluster_vc;
- int cluster_vc_channel;
- ClusterVCToken open_local_token;
-
- // Readahead on open read specific data
-
- int caller_buf_freebytes; // remote bufsize for
- // initial data
- VIO *readahead_vio;
- IOBufferReader *readahead_reader;
- Ptr<IOBufferBlock> readahead_data;
- bool have_all_data; // all object data in response
-
- CacheHTTPInfo cache_vc_info;
- OneWayTunnel *tunnel;
- Ptr<ProxyMutex> tunnel_mutex;
- CacheContinuation *tunnel_cont;
- bool tunnel_closed;
- Action *cache_action;
- Event *lookup_open_write_vc_event;
-
- // Incoming data generated from unmarshaling request/response ops
-
- Arena ic_arena;
- CacheHTTPHdr ic_request;
- CacheHTTPHdr ic_response;
+ time_t pin_in_cache;
+ int64_t doc_size;
+ int64_t total_length;
+ VIO *vio; //
+ IOBufferReader *reader; // for normal read
CacheLookupHttpConfig *ic_params;
- CacheHTTPInfo ic_old_info;
- CacheHTTPInfo ic_new_info;
- Ptr<IOBufferData> ic_hostname;
- int ic_hostname_len;
-
- // debugging
- int cache_op_ClusterFunction;
-
- int lookupEvent(int event, void *d);
- int probeLookupEvent(int event, void *d);
- int remoteOpEvent(int event, Event * e);
- int replyLookupEvent(int event, void *d);
- int replyOpEvent(int event, VConnection * vc);
- int handleReplyEvent(int event, Event * e);
- int callbackEvent(int event, Event * e);
- int setupVCdataRead(int event, VConnection * vc);
- int VCdataRead(int event, VIO * target_vio);
- int setupReadWriteVC(int, VConnection *);
- ClusterVConnection *lookupOpenWriteVC();
- int lookupOpenWriteVCEvent(int, Event *);
- int localVCsetupEvent(int event, ClusterVConnection * vc);
- void insert_cache_callback_user(ClusterVConnection *, int, void *);
- int insertCallbackEvent(int, Event *);
- void callback_user(int result, void *d);
- void defer_callback_result(int result, void *d);
- int callbackResultEvent(int event, Event * e);
- void setupReadBufTunnel(VConnection *, VConnection *);
- int tunnelClosedEvent(int event, void *);
- int remove_and_delete(int, Event *);
+ MIOBuffer *mbuf;
+ EThread *thread;
+
+// int lookupEvent(int event, void *d);
+// int probeLookupEvent(int event, void *d);
+// int remoteOpEvent(int event, Event * e);
+// int replyLookupEvent(int event, void *d);
+ int replyOpEvent();
+// int handleReplyEvent(int event, Event * e);
+// int callbackEvent(int event, Event * e);
+ int setupVCdataRead(int event, void *data);
+ int setupVCdataWrite(int event, void *data);
+ int setupVCdataRemove(int event, void *data);
+ int setupVCdataLink(int event, void *data);
+ int setupVCdataDeref(int event, void *data);
+ int VCdataRead(int event, void *data);
+ int VCdataWrite(int event, void *data);
+ int VCSmallDataRead(int event, void *data);
+// int setupReadWriteVC(int, VConnection *);
+// ClusterVConnection *lookupOpenWriteVC();
+// int lookupOpenWriteVCEvent(int, Event *);
+// int localVCsetupEvent(int event, ClusterVConnection * vc);
+// void insert_cache_callback_user(ClusterVConnection *, int, void *);
+// int insertCallbackEvent(int, Event *);
+// void callback_user(int result, void *d);
+// void defer_callback_result(int result, void *d);
+// int callbackResultEvent(int event, Event * e);
+// void setupReadBufTunnel(VConnection *, VConnection *);
+// int tunnelClosedEvent(int event, void *);
+// int remove_and_delete(int, Event *);
inline void setMsgBufferLen(int l, IOBufferData * b = 0) {
@@ -254,66 +316,26 @@ struct CacheContinuation:public Continuation
if (ic_request.valid()) {
ic_request.clear();
}
- if (ic_response.valid()) {
- ic_response.clear();
- }
+// if (ic_response.valid()) {
+// ic_response.clear();
+// }
if (ic_old_info.valid()) {
ic_old_info.destroy();
}
if (ic_new_info.valid()) {
ic_new_info.destroy();
}
- ic_arena.reset();
+// ic_arena.reset();
freeMsgBuffer();
-
- tunnel_mutex = 0;
- readahead_data = 0;
+//
+// tunnel_mutex = 0;
+// readahead_data = 0;
ic_hostname = 0;
}
-CacheContinuation():
- Continuation(NULL),
- magicno(MagicNo),
- callback_data(0),
- callback_data_2(0),
- timeout(0),
- target_machine(0),
- probe_depth(0),
- start_time(0),
- cache_read(false),
- result(0),
- result_error(0),
- seq_number(0),
- cfl_flags(0),
- frag_type(CACHE_FRAG_TYPE_NONE),
- nbytes(0),
- target_ip(0),
- request_opcode(0),
- request_purge(false),
- local_lookup_only(0),
- no_reply_message(0),
- request_timeout(0),
- expect_cache_callback(true),
- use_deferred_callback(0),
- pin_in_cache(0),
- rw_buf_msg_len(0),
- read_cluster_vc(0),
- write_cluster_vc(0),
- cluster_vc_channel(0),
- caller_buf_freebytes(0),
- readahead_vio(0),
- readahead_reader(0),
- have_all_data(false),
- cache_vc_info(),
- tunnel(0),
- tunnel_cont(0),
- tunnel_closed(0),
- lookup_open_write_vc_event(0),
- ic_arena(),
- ic_request(),
- ic_response(), ic_params(0), ic_old_info(), ic_new_info(), ic_hostname_len(0), cache_op_ClusterFunction(0) {
- token.clear();
- SET_HANDLER((CacheContHandler) & CacheContinuation::remoteOpEvent);
+ CacheContinuation(): magicno(MagicNo) {
+ size_to_init = sizeof(CacheContinuation) - (size_t) & ((CacheContinuation *) 0)->cs;
+ memset((char *) &cs, 0, size_to_init);
}
inline static bool is_ClusterThread(EThread * et)
@@ -334,14 +356,66 @@ CacheContinuation():
static void cacheContAllocator_free(CacheContinuation *);
inkcoreapi static Action *callback_failure(Action *, int, int, CacheContinuation * this_cc = 0);
static Action *do_remote_lookup(Continuation *, CacheKey *, CacheContinuation *, CacheFragType, char *, int);
- inkcoreapi static Action *do_op(Continuation *, ClusterMachine *, void *, int, char *, int,
- int nbytes = -1, MIOBuffer * b = 0);
+ inkcoreapi static Action *do_op(Continuation * c, ClusterSession cs, void *args,
+ int user_opcode, IOBufferData *data, int data_len, int nbytes = -1, MIOBuffer * b = 0);
static int setup_local_vc(char *data, int data_len, CacheContinuation * cc, ClusterMachine * mp, Action **);
static void disposeOfDataBuffer(void *buf);
static int handleDisposeEvent(int event, CacheContinuation * cc);
- static int32_t getObjectSize(VConnection *, int, CacheHTTPInfo *);
+ int32_t getObjectSize(VConnection *, int, CacheHTTPInfo *);
};
+extern ClassAllocator<CacheContinuation> cacheContAllocator;
+
+inline CacheContinuation *
+new_CacheCont(EThread *t) {
+ ink_assert(t == this_ethread());
+ CacheContinuation *c = cacheContAllocator.alloc();
+ c->mutex = new_ProxyMutex();
+ c->start_time = ink_get_hrtime();
+ c->thread = t;
+ return c;
+}
+
+inline void
+free_CacheCont(CacheContinuation *c) {
+ ink_assert(c->magicno == (int) c->MagicNo && !c->expect_next);
+// ink_assert(!c->cache_op_ClusterFunction);
+ if (c->pending_action) {
+ c->pending_action->cancel();
+ c->pending_action = NULL;
+ }
+ if (c->cache_vc) {
+ if (c->cache_vc->vio.op == VIO::READ)
+ c->cache_vc->do_io(VIO::CLOSE);
+ else
+ c->cache_vc->do_io(VIO::ABORT);
+ c->cache_vc = NULL;
+ }
+ if (c->mbuf) {
+ free_MIOBuffer(c->mbuf);
+ c->mbuf = NULL;
+ }
+
+ c->magicno = -1;
+ c->token.clear();
+ c->cache_vc_info.clear();
+ if (c->ic_params) {
+ delete c->ic_params;
+ c->ic_params = 0;
+ }
+ c->ic_request.clear();
+ c->ic_old_info.clear();
+ c->ic_new_info.destroy();
+ c->ic_arena.reset();
+ c->freeMsgBuffer();
+ c->ic_hostname = 0;
+ c->mutex.clear();
+
+ c->doc_data = NULL;
+
+ cacheContAllocator.free(c);
+}
+
/////////////////////////////////////////
// Cache OP specific args for do_op() //
/////////////////////////////////////////
@@ -595,18 +669,19 @@ struct CacheOpReplyMsg:public ClusterMessageHeader
{
uint32_t seq_number;
int32_t result;
- ClusterVCToken token;
- bool is_ram_cache_hit; // Entire object was from ram cache
- Alias32 moi; // Used by CACHE_OPEN_READ & CACHE_LINK reply
+ int32_t h_len;
+ int32_t d_len;
+ int32_t reason; // // Used by CACHE_OPEN_READ & CACHE_LINK reply
+ int64_t doc_size;
+
enum
{
MIN_VERSION = 1,
MAX_VERSION = 1,
CACHE_OP_REPLY_MESSAGE_VERSION = MAX_VERSION
};
- CacheOpReplyMsg(uint16_t vers = CACHE_OP_REPLY_MESSAGE_VERSION)
- : ClusterMessageHeader(vers), seq_number(0), result(0), is_ram_cache_hit(false) {
- moi.u32 = 0;
+ CacheOpReplyMsg(uint16_t vers = CACHE_OP_REPLY_MESSAGE_VERSION):
+ ClusterMessageHeader(vers), seq_number(0), result(0), h_len(0), d_len(0), reason(0), doc_size(0) {
}
//////////////////////////////////////////////////////////////////////////
@@ -617,7 +692,7 @@ struct CacheOpReplyMsg:public ClusterMessageHeader
}
static int sizeof_fixedlen_msg()
{
- return (int) ALIGN_DOUBLE(offsetof(CacheOpReplyMsg, moi));
+ return INK_ALIGN(sizeof (CacheOpReplyMsg), 16);
}
void init(uint16_t vers = CACHE_OP_REPLY_MESSAGE_VERSION) {
_init(vers);
@@ -627,12 +702,12 @@ struct CacheOpReplyMsg:public ClusterMessageHeader
if (NeedByteSwap()) {
ats_swap32(&seq_number);
ats_swap32((uint32_t *) & result);
- token.SwapBytes();
+ ats_swap32((uint32_t *) & reason);
+ ats_swap64((uint64_t *) & doc_size);
}
}
//////////////////////////////////////////////////////////////////////////
};
-
inline int
maxval(int a, int b)
{
@@ -795,6 +870,7 @@ event_reply_may_have_moi(int event)
{
switch (event) {
case CACHE_EVENT_OPEN_READ:
+ case CACHE_EVENT_OPEN_WRITE:
case CACHE_EVENT_LINK:
case CACHE_EVENT_LINK_FAILED:
case CACHE_EVENT_OPEN_READ_FAILED: