You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by zw...@apache.org on 2015/03/23 21:33:11 UTC
[38/52] [partial] trafficserver git commit: TS-3419 Fix some enum's
such that clang-format can handle it the way we want. Basically this means
having a trailing ,
on short enum's. TS-3419 Run clang-format over most of the source
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/65477944/iocore/cluster/ClusterConfig.cc
----------------------------------------------------------------------
diff --git a/iocore/cluster/ClusterConfig.cc b/iocore/cluster/ClusterConfig.cc
index 68bee6b..cac0014 100644
--- a/iocore/cluster/ClusterConfig.cc
+++ b/iocore/cluster/ClusterConfig.cc
@@ -32,13 +32,8 @@
int cluster_port = DEFAULT_CLUSTER_PORT_NUMBER;
ClusterAccept::ClusterAccept(int *port, int send_bufsize, int recv_bufsize)
- : Continuation(0),
- p_cluster_port(port),
- socket_send_bufsize(send_bufsize),
- socket_recv_bufsize(recv_bufsize),
- current_cluster_port(-1),
- accept_action(0),
- periodic_event(0)
+ : Continuation(0), p_cluster_port(port), socket_send_bufsize(send_bufsize), socket_recv_bufsize(recv_bufsize),
+ current_cluster_port(-1), accept_action(0), periodic_event(0)
{
mutex = new_ProxyMutex();
SET_HANDLER(&ClusterAccept::ClusterAcceptEvent);
@@ -86,54 +81,50 @@ int
ClusterAccept::ClusterAcceptEvent(int event, void *data)
{
switch (event) {
- case EVENT_IMMEDIATE:
- {
- ShutdownDelete();
- return EVENT_DONE;
- }
- case EVENT_INTERVAL:
- {
- int cluster_port = *p_cluster_port;
-
- if (cluster_port != current_cluster_port) {
- // Configuration changed cluster port, redo accept on new port.
- if (accept_action) {
- accept_action->cancel();
- accept_action = 0;
- }
+ case EVENT_IMMEDIATE: {
+ ShutdownDelete();
+ return EVENT_DONE;
+ }
+ case EVENT_INTERVAL: {
+ int cluster_port = *p_cluster_port;
+
+ if (cluster_port != current_cluster_port) {
+ // Configuration changed cluster port, redo accept on new port.
+ if (accept_action) {
+ accept_action->cancel();
+ accept_action = 0;
+ }
- NetProcessor::AcceptOptions opt;
- opt.recv_bufsize = socket_recv_bufsize;
- opt.send_bufsize = socket_send_bufsize;
- opt.etype = ET_CLUSTER;
- opt.local_port = cluster_port;
- opt.ip_family = AF_INET;
- opt.localhost_only = false;
-
- accept_action = netProcessor.main_accept(this, NO_FD, opt);
- if (!accept_action) {
- Warning("Unable to accept cluster connections on port: %d", cluster_port);
- } else {
- current_cluster_port = cluster_port;
- }
+ NetProcessor::AcceptOptions opt;
+ opt.recv_bufsize = socket_recv_bufsize;
+ opt.send_bufsize = socket_send_bufsize;
+ opt.etype = ET_CLUSTER;
+ opt.local_port = cluster_port;
+ opt.ip_family = AF_INET;
+ opt.localhost_only = false;
+
+ accept_action = netProcessor.main_accept(this, NO_FD, opt);
+ if (!accept_action) {
+ Warning("Unable to accept cluster connections on port: %d", cluster_port);
+ } else {
+ current_cluster_port = cluster_port;
}
- return EVENT_CONT;
- }
- case NET_EVENT_ACCEPT:
- {
- ClusterAcceptMachine((NetVConnection *) data);
- return EVENT_DONE;
- }
- default:
- {
- Warning("ClusterAcceptEvent: received unknown event %d", event);
- return EVENT_DONE;
}
- } // End of switch
+ return EVENT_CONT;
+ }
+ case NET_EVENT_ACCEPT: {
+ ClusterAcceptMachine((NetVConnection *)data);
+ return EVENT_DONE;
+ }
+ default: {
+ Warning("ClusterAcceptEvent: received unknown event %d", event);
+ return EVENT_DONE;
+ }
+ } // End of switch
}
int
-ClusterAccept::ClusterAcceptMachine(NetVConnection * NetVC)
+ClusterAccept::ClusterAcceptMachine(NetVConnection *NetVC)
{
// Validate remote IP address.
unsigned int remote_ip = NetVC->get_remote_ip();
@@ -155,7 +146,7 @@ ClusterAccept::ClusterAcceptMachine(NetVConnection * NetVC)
}
static void
-make_cluster_connections(MachineList * l)
+make_cluster_connections(MachineList *l)
{
//
// Connect to all new machines.
@@ -179,8 +170,7 @@ make_cluster_connections(MachineList * l)
}
int
-machine_config_change(const char * /* name ATS_UNUSED */, RecDataT /* data_type ATS_UNUSED */, RecData data,
- void *cookie)
+machine_config_change(const char * /* name ATS_UNUSED */, RecDataT /* data_type ATS_UNUSED */, RecData data, void *cookie)
{
// Handle changes to the cluster.config or machines.config
// file. cluster.config is the list of machines in the
@@ -189,11 +179,11 @@ machine_config_change(const char * /* name ATS_UNUSED */, RecDataT /* data_type
// This may include front-end load redirectors, machines going
// up or coming down etc.
//
- char *filename = (char *) data.rec_string;
+ char *filename = (char *)data.rec_string;
MachineList *l = read_MachineList(filename);
MachineList *old = NULL;
#ifdef USE_SEPARATE_MACHINE_CONFIG
- switch ((int) cookie) {
+ switch ((int)cookie) {
case MACHINE_CONFIG:
old = machines_config;
machines_config = l;
@@ -205,7 +195,7 @@ machine_config_change(const char * /* name ATS_UNUSED */, RecDataT /* data_type
break;
}
#else
- (void) cookie;
+ (void)cookie;
old = cluster_config;
machines_config = l;
cluster_config = l;
@@ -229,7 +219,7 @@ do_machine_config_change(void *d, const char *s)
/*************************************************************************/
// ClusterConfiguration member functions (Public Class)
/*************************************************************************/
-ClusterConfiguration::ClusterConfiguration():n_machines(0), changed(0)
+ClusterConfiguration::ClusterConfiguration() : n_machines(0), changed(0)
{
memset(machines, 0, sizeof(machines));
memset(hash_table, 0, sizeof(hash_table));
@@ -239,40 +229,39 @@ ClusterConfiguration::ClusterConfiguration():n_machines(0), changed(0)
// ConfigurationContinuation member functions (Internal Class)
/*************************************************************************/
struct ConfigurationContinuation;
-typedef int (ConfigurationContinuation::*CfgContHandler) (int, void *);
-struct ConfigurationContinuation: public Continuation
-{
+typedef int (ConfigurationContinuation::*CfgContHandler)(int, void *);
+struct ConfigurationContinuation : public Continuation {
ClusterConfiguration *c;
ClusterConfiguration *prev;
int
- zombieEvent(int /* event ATS_UNUSED */, Event * e)
+ zombieEvent(int /* event ATS_UNUSED */, Event *e)
{
- prev->link.next = NULL; // remove that next pointer
- SET_HANDLER((CfgContHandler) & ConfigurationContinuation::dieEvent);
+ prev->link.next = NULL; // remove that next pointer
+ SET_HANDLER((CfgContHandler)&ConfigurationContinuation::dieEvent);
e->schedule_in(CLUSTER_CONFIGURATION_ZOMBIE);
return EVENT_CONT;
}
int
- dieEvent(int event, Event * e)
+ dieEvent(int event, Event *e)
{
- (void) event;
- (void) e;
+ (void)event;
+ (void)e;
delete c;
delete this;
return EVENT_DONE;
}
- ConfigurationContinuation(ClusterConfiguration * cc, ClusterConfiguration * aprev)
- : Continuation(NULL), c(cc), prev(aprev) {
+ ConfigurationContinuation(ClusterConfiguration *cc, ClusterConfiguration *aprev) : Continuation(NULL), c(cc), prev(aprev)
+ {
mutex = new_ProxyMutex();
- SET_HANDLER((CfgContHandler) & ConfigurationContinuation::zombieEvent);
+ SET_HANDLER((CfgContHandler)&ConfigurationContinuation::zombieEvent);
}
};
static void
-free_configuration(ClusterConfiguration * c, ClusterConfiguration * prev)
+free_configuration(ClusterConfiguration *c, ClusterConfiguration *prev)
{
//
// Delete the configuration after a time.
@@ -286,7 +275,7 @@ free_configuration(ClusterConfiguration * c, ClusterConfiguration * prev)
}
ClusterConfiguration *
-configuration_add_machine(ClusterConfiguration * c, ClusterMachine * m)
+configuration_add_machine(ClusterConfiguration *c, ClusterMachine *m)
{
// Build a new cluster configuration with the new machine.
// Machines are stored in ip sorted order.
@@ -318,7 +307,7 @@ configuration_add_machine(ClusterConfiguration * c, ClusterMachine * m)
ink_assert(cc->n_machines < CLUSTER_MAX_MACHINES);
build_cluster_hash_table(cc);
- INK_MEMORY_BARRIER; // commit writes before freeing old hash table
+ INK_MEMORY_BARRIER; // commit writes before freeing old hash table
CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CONFIGURATION_CHANGES_STAT);
free_configuration(c, cc);
@@ -326,7 +315,7 @@ configuration_add_machine(ClusterConfiguration * c, ClusterMachine * m)
}
ClusterConfiguration *
-configuration_remove_machine(ClusterConfiguration * c, ClusterMachine * m)
+configuration_remove_machine(ClusterConfiguration *c, ClusterMachine *m)
{
EThread *thread = this_ethread();
ProxyMutex *mutex = thread->mutex;
@@ -349,7 +338,7 @@ configuration_remove_machine(ClusterConfiguration * c, ClusterMachine * m)
cc->changed = ink_get_hrtime();
build_cluster_hash_table(cc);
- INK_MEMORY_BARRIER; // commit writes before freeing old hash table
+ INK_MEMORY_BARRIER; // commit writes before freeing old hash table
CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CONFIGURATION_CHANGES_STAT);
free_configuration(c, cc);
@@ -365,16 +354,14 @@ configuration_remove_machine(ClusterConfiguration * c, ClusterMachine * m)
// owner (machine now as opposed to in the past).
//
ClusterMachine *
-cluster_machine_at_depth(unsigned int hash, int *pprobe_depth, ClusterMachine ** past_probes)
+cluster_machine_at_depth(unsigned int hash, int *pprobe_depth, ClusterMachine **past_probes)
{
#ifdef CLUSTER_TOMCAT
if (!cache_clustering_enabled)
return NULL;
#endif
- ClusterConfiguration *
- cc = this_cluster()->current_configuration();
- ClusterConfiguration *
- next_cc = cc;
+ ClusterConfiguration *cc = this_cluster()->current_configuration();
+ ClusterConfiguration *next_cc = cc;
ink_hrtime now = ink_get_hrtime();
int fake_probe_depth = 0;
int &probe_depth = pprobe_depth ? (*pprobe_depth) : fake_probe_depth;
@@ -413,14 +400,12 @@ cluster_machine_at_depth(unsigned int hash, int *pprobe_depth, ClusterMachine **
continue;
}
- ClusterMachine *
- m = cc->machine_hash(hash);
+ ClusterMachine *m = cc->machine_hash(hash);
// If it is not this machine, or a machine we have done before
// and one that is still up, try again
//
- bool
- ok = !(m == this_cluster_machine() || (past_probes && machine_in_vector(m, past_probes, probe_depth)) || m->dead);
+ bool ok = !(m == this_cluster_machine() || (past_probes && machine_in_vector(m, past_probes, probe_depth)) || m->dead);
// Store the all but the last probe, so that we never return
// the same machine
@@ -431,7 +416,7 @@ cluster_machine_at_depth(unsigned int hash, int *pprobe_depth, ClusterMachine **
if (!ok) {
if (!pprobe_depth)
- break; // don't go down if we don't have a depth
+ break; // don't go down if we don't have a depth
continue;
}
@@ -447,9 +432,9 @@ cluster_machine_at_depth(unsigned int hash, int *pprobe_depth, ClusterMachine **
// stored in the ClusterMachine structures
//
void
-initialize_thread_for_cluster(EThread * e)
+initialize_thread_for_cluster(EThread *e)
{
- (void) e;
+ (void)e;
}
/*************************************************************************/
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/65477944/iocore/cluster/ClusterHandler.cc
----------------------------------------------------------------------
diff --git a/iocore/cluster/ClusterHandler.cc b/iocore/cluster/ClusterHandler.cc
index afb0156..979bc8d 100644
--- a/iocore/cluster/ClusterHandler.cc
+++ b/iocore/cluster/ClusterHandler.cc
@@ -55,15 +55,15 @@ static int dump_msgs = 0;
// VERIFY_PETERS_DATA support code
/////////////////////////////////////////
#ifdef VERIFY_PETERS_DATA
-#define DO_VERIFY_PETERS_DATA(_p,_l) verify_peters_data(_p,_l)
+#define DO_VERIFY_PETERS_DATA(_p, _l) verify_peters_data(_p, _l)
#else
-#define DO_VERIFY_PETERS_DATA(_p,_l)
+#define DO_VERIFY_PETERS_DATA(_p, _l)
#endif
void
verify_peters_data(char *ap, int l)
{
- unsigned char *p = (unsigned char *) ap;
+ unsigned char *p = (unsigned char *)ap;
for (int i = 0; i < l - 1; i++) {
unsigned char x1 = p[i];
unsigned char x2 = p[i + 1];
@@ -134,71 +134,22 @@ verify_peters_data(char *ap, int l)
/*************************************************************************/
ClusterHandler::ClusterHandler()
- : net_vc(0),
- thread(0),
- ip(0),
- port(0),
- hostname(NULL),
- machine(NULL),
- ifd(-1),
- id(-1),
- dead(true),
- downing(false),
- active(false),
- on_stolen_thread(false),
- n_channels(0),
- channels(NULL),
- channel_data(NULL),
- connector(false),
- cluster_connect_state(ClusterHandler::CLCON_INITIAL),
- needByteSwap(false),
- configLookupFails(0),
- cluster_periodic_event(0),
- read(this, true),
- write(this, false),
- current_time(0),
- last(0),
- last_report(0),
- n_since_last_report(0),
- last_cluster_op_enable(0),
- last_trace_dump(0),
- clm(0),
- disable_remote_cluster_ops(0),
- pw_write_descriptors_built(0),
- pw_freespace_descriptors_built(0),
- pw_controldata_descriptors_built(0), pw_time_expired(0), started_on_stolen_thread(false), control_message_write(false)
+ : net_vc(0), thread(0), ip(0), port(0), hostname(NULL), machine(NULL), ifd(-1), id(-1), dead(true), downing(false), active(false),
+ on_stolen_thread(false), n_channels(0), channels(NULL), channel_data(NULL), connector(false),
+ cluster_connect_state(ClusterHandler::CLCON_INITIAL), needByteSwap(false), configLookupFails(0), cluster_periodic_event(0),
+ read(this, true), write(this, false), current_time(0), last(0), last_report(0), n_since_last_report(0),
+ last_cluster_op_enable(0), last_trace_dump(0), clm(0), disable_remote_cluster_ops(0), pw_write_descriptors_built(0),
+ pw_freespace_descriptors_built(0), pw_controldata_descriptors_built(0), pw_time_expired(0), started_on_stolen_thread(false),
+ control_message_write(false)
#ifdef CLUSTER_STATS
- ,
- _vc_writes(0),
- _vc_write_bytes(0),
- _control_write_bytes(0),
- _dw_missed_lock(0),
- _dw_not_enabled(0),
- _dw_wait_remote_fill(0),
- _dw_no_active_vio(0),
- _dw_not_enabled_or_no_write(0),
- _dw_set_data_pending(0),
- _dw_no_free_space(0),
- _fw_missed_lock(0),
- _fw_not_enabled(0),
- _fw_wait_remote_fill(0),
- _fw_no_active_vio(0),
- _fw_not_enabled_or_no_read(0),
- _process_read_calls(0),
- _n_read_start(0),
- _n_read_header(0),
- _n_read_await_header(0),
- _n_read_setup_descriptor(0),
- _n_read_descriptor(0),
- _n_read_await_descriptor(0),
- _n_read_setup_data(0),
- _n_read_data(0),
- _n_read_await_data(0),
- _n_read_post_complete(0),
- _n_read_complete(0),
- _process_write_calls(0),
- _n_write_start(0),
- _n_write_setup(0), _n_write_initiate(0), _n_write_await_completion(0), _n_write_post_complete(0), _n_write_complete(0)
+ ,
+ _vc_writes(0), _vc_write_bytes(0), _control_write_bytes(0), _dw_missed_lock(0), _dw_not_enabled(0), _dw_wait_remote_fill(0),
+ _dw_no_active_vio(0), _dw_not_enabled_or_no_write(0), _dw_set_data_pending(0), _dw_no_free_space(0), _fw_missed_lock(0),
+ _fw_not_enabled(0), _fw_wait_remote_fill(0), _fw_no_active_vio(0), _fw_not_enabled_or_no_read(0), _process_read_calls(0),
+ _n_read_start(0), _n_read_header(0), _n_read_await_header(0), _n_read_setup_descriptor(0), _n_read_descriptor(0),
+ _n_read_await_descriptor(0), _n_read_setup_data(0), _n_read_data(0), _n_read_await_data(0), _n_read_post_complete(0),
+ _n_read_complete(0), _process_write_calls(0), _n_write_start(0), _n_write_setup(0), _n_write_initiate(0),
+ _n_write_await_completion(0), _n_write_post_complete(0), _n_write_complete(0)
#endif
{
#ifdef MSG_TRACE
@@ -207,26 +158,24 @@ ClusterHandler::ClusterHandler()
// we need to lead by at least 1
min_priority = 1;
- SET_HANDLER((ClusterContHandler) & ClusterHandler::startClusterEvent);
+ SET_HANDLER((ClusterContHandler)&ClusterHandler::startClusterEvent);
mutex = new_ProxyMutex();
OutgoingControl oc;
int n;
for (n = 0; n < CLUSTER_CMSG_QUEUES; ++n) {
- ink_atomiclist_init(&outgoing_control_al[n], "OutGoingControlQueue", (char *) &oc.link.next - (char *) &oc);
+ ink_atomiclist_init(&outgoing_control_al[n], "OutGoingControlQueue", (char *)&oc.link.next - (char *)&oc);
}
IncomingControl ic;
- ink_atomiclist_init(&external_incoming_control,
- "ExternalIncomingControlQueue", (char *) &ic.link.next - (char *) &ic);
+ ink_atomiclist_init(&external_incoming_control, "ExternalIncomingControlQueue", (char *)&ic.link.next - (char *)&ic);
ClusterVConnection ivc;
- ink_atomiclist_init(&external_incoming_open_local,
- "ExternalIncomingOpenLocalQueue", (char *) &ivc.link.next - (char *) &ivc);
+ ink_atomiclist_init(&external_incoming_open_local, "ExternalIncomingOpenLocalQueue", (char *)&ivc.link.next - (char *)&ivc);
ink_atomiclist_init(&read_vcs_ready, "ReadVcReadyQueue", offsetof(ClusterVConnection, ready_alink.next));
ink_atomiclist_init(&write_vcs_ready, "WriteVcReadyQueue", offsetof(ClusterVConnection, ready_alink.next));
- memset((char *) &callout_cont[0], 0, sizeof(callout_cont));
- memset((char *) &callout_events[0], 0, sizeof(callout_events));
+ memset((char *)&callout_cont[0], 0, sizeof(callout_cont));
+ memset((char *)&callout_events[0], 0, sizeof(callout_events));
}
ClusterHandler::~ClusterHandler()
@@ -260,11 +209,11 @@ ClusterHandler::~ClusterHandler()
channel_data = NULL;
}
if (read_vcs)
- delete[]read_vcs;
+ delete[] read_vcs;
read_vcs = NULL;
if (write_vcs)
- delete[]write_vcs;
+ delete[] write_vcs;
write_vcs = NULL;
if (clm) {
@@ -277,7 +226,7 @@ ClusterHandler::~ClusterHandler()
}
void
-ClusterHandler::close_ClusterVConnection(ClusterVConnection * vc)
+ClusterHandler::close_ClusterVConnection(ClusterVConnection *vc)
{
//
// Close down a ClusterVConnection
@@ -315,7 +264,6 @@ ClusterHandler::close_ClusterVConnection(ClusterVConnection * vc)
ink_assert(!vc->write_bytes_in_transit);
if (((!vc->remote_closed && !vc->have_all_data) || (vc->remote_closed == FORCE_CLOSE_ON_OPEN_CHANNEL)) && vc->ch) {
-
CloseMessage msg;
int vers = CloseMessage::protoToVersion(vc->ch->machine->msg_proto_major);
void *data;
@@ -326,7 +274,7 @@ ClusterHandler::close_ClusterVConnection(ClusterVConnection * vc)
msg.status = (vc->remote_closed == FORCE_CLOSE_ON_OPEN_CHANNEL) ? FORCE_CLOSE_ON_OPEN_CHANNEL : vc->closed;
msg.lerrno = vc->lerrno;
msg.sequence_number = vc->token.sequence_number;
- data = (void *) &msg;
+ data = (void *)&msg;
len = sizeof(CloseMessage);
} else {
@@ -349,41 +297,41 @@ ClusterHandler::close_ClusterVConnection(ClusterVConnection * vc)
}
inline bool
-ClusterHandler::vc_ok_write(ClusterVConnection * vc)
+ClusterHandler::vc_ok_write(ClusterVConnection *vc)
{
- return (((vc->closed > 0)
- && (vc->write_list || vc->write_bytes_in_transit)) ||
+ return (((vc->closed > 0) && (vc->write_list || vc->write_bytes_in_transit)) ||
(!vc->closed && vc->write.enabled && vc->write.vio.op == VIO::WRITE && vc->write.vio.buffer.writer()));
}
inline bool
-ClusterHandler::vc_ok_read(ClusterVConnection * vc)
+ClusterHandler::vc_ok_read(ClusterVConnection *vc)
{
return (!vc->closed && vc->read.vio.op == VIO::READ && vc->read.vio.buffer.writer());
}
void
-ClusterHandler::close_free_lock(ClusterVConnection * vc, ClusterVConnState * s)
+ClusterHandler::close_free_lock(ClusterVConnection *vc, ClusterVConnState *s)
{
Ptr<ProxyMutex> m(s->vio.mutex);
if (s == &vc->read) {
- if ((ProxyMutex *) vc->read_locked)
+ if ((ProxyMutex *)vc->read_locked)
MUTEX_UNTAKE_LOCK(vc->read_locked, thread);
vc->read_locked = NULL;
} else {
- if ((ProxyMutex *) vc->write_locked)
+ if ((ProxyMutex *)vc->write_locked)
MUTEX_UNTAKE_LOCK(vc->write_locked, thread);
vc->write_locked = NULL;
}
close_ClusterVConnection(vc);
}
-bool ClusterHandler::build_data_vector(char *d, int len, bool read_flag)
+bool
+ClusterHandler::build_data_vector(char *d, int len, bool read_flag)
{
// Internal interface to general network i/o facility allowing
// single vector read/write to static data buffer.
- ClusterState & s = (read_flag ? read : write);
+ ClusterState &s = (read_flag ? read : write);
ink_assert(d);
ink_assert(len);
ink_assert(s.iov);
@@ -409,7 +357,8 @@ bool ClusterHandler::build_data_vector(char *d, int len, bool read_flag)
return true;
}
-bool ClusterHandler::build_initial_vector(bool read_flag)
+bool
+ClusterHandler::build_initial_vector(bool read_flag)
{
//
// Build initial read/write struct iovec and corresponding IOBufferData
@@ -447,7 +396,7 @@ bool ClusterHandler::build_initial_vector(bool read_flag)
// MIOBuffer *w;
ink_hrtime now = ink_get_hrtime();
- ClusterState & s = (read_flag ? read : write);
+ ClusterState &s = (read_flag ? read : write);
OutgoingControl *oc = s.msg.outgoing_control.head;
IncomingControl *ic = incoming_control.head;
int new_n_iov = 0;
@@ -512,8 +461,7 @@ bool ClusterHandler::build_initial_vector(bool read_flag)
// Note: We are assuming that free space descriptors follow
// the data descriptors.
//////////////////////////////////////////////////////////////
- for (i = 0; i<(read_flag ? ((s.msg.state>= 2) ? s.msg.count : 0)
- : s.msg.count); i++) {
+ for (i = 0; i < (read_flag ? ((s.msg.state >= 2) ? s.msg.count : 0) : s.msg.count); i++) {
if (s.msg.descriptor[i].type == CLUSTER_SEND_DATA) {
///////////////////////////////////
// Control channel data
@@ -533,7 +481,7 @@ bool ClusterHandler::build_initial_vector(bool read_flag)
CLUSTER_INCREMENT_DYN_STAT(CLUSTER_SLOW_CTRL_MSGS_RECVD_STAT);
}
// Mark message data as invalid
- *((uint32_t *) ic->data) = UNDEFINED_CLUSTER_FUNCTION;
+ *((uint32_t *)ic->data) = UNDEFINED_CLUSTER_FUNCTION;
incoming_control.enqueue(ic);
}
s.iov[new_n_iov].iov_base = 0;
@@ -541,7 +489,7 @@ bool ClusterHandler::build_initial_vector(bool read_flag)
s.block[new_n_iov] = ic->get_block();
to_do += s.iov[new_n_iov].iov_len;
++new_n_iov;
- ic = (IncomingControl *) ic->link.next;
+ ic = (IncomingControl *)ic->link.next;
} else {
///////////////////////
// Outgoing Control
@@ -552,23 +500,21 @@ bool ClusterHandler::build_initial_vector(bool read_flag)
s.block[new_n_iov] = oc->get_block();
to_do += s.iov[new_n_iov].iov_len;
++new_n_iov;
- oc = (OutgoingControl *) oc->link.next;
+ oc = (OutgoingControl *)oc->link.next;
}
} else {
///////////////////////////////
// User channel data
///////////////////////////////
- ClusterVConnection *
- vc = channels[s.msg.descriptor[i].channel];
+ ClusterVConnection *vc = channels[s.msg.descriptor[i].channel];
- if (VALID_CHANNEL(vc) &&
- (s.msg.descriptor[i].sequence_number) == CLUSTER_SEQUENCE_NUMBER(vc->token.sequence_number)) {
+ if (VALID_CHANNEL(vc) && (s.msg.descriptor[i].sequence_number) == CLUSTER_SEQUENCE_NUMBER(vc->token.sequence_number)) {
if (read_flag) {
ink_release_assert(!vc->initial_data_bytes);
/////////////////////////////////////
// Try to get the read VIO mutex
/////////////////////////////////////
- ink_release_assert(!(ProxyMutex *) vc->read_locked);
+ ink_release_assert(!(ProxyMutex *)vc->read_locked);
#ifdef CLUSTER_TOMCAT
if (!vc->read.vio.mutex ||
!MUTEX_TAKE_TRY_LOCK_FOR_SPIN(vc->read.vio.mutex, thread, vc->read.vio._cont, READ_LOCK_SPIN_COUNT))
@@ -616,21 +562,21 @@ bool ClusterHandler::build_initial_vector(bool read_flag)
bool remote_write_fill = (vc->pending_remote_fill && vc->remote_write_block);
// Sanity check, assert we have the lock
if (!remote_write_fill) {
- ink_assert((ProxyMutex *) vc->write_locked);
+ ink_assert((ProxyMutex *)vc->write_locked);
}
if (vc_ok_write(vc) || remote_write_fill) {
if (remote_write_fill) {
s.iov[new_n_iov].iov_base = 0;
- ink_release_assert((int) s.msg.descriptor[i].length == bytes_IOBufferBlockList(vc->remote_write_block, 1));
+ ink_release_assert((int)s.msg.descriptor[i].length == bytes_IOBufferBlockList(vc->remote_write_block, 1));
s.block[new_n_iov] = vc->remote_write_block;
} else {
s.iov[new_n_iov].iov_base = 0;
- ink_release_assert((int) s.msg.descriptor[i].length <= vc->write_list_bytes);
+ ink_release_assert((int)s.msg.descriptor[i].length <= vc->write_list_bytes);
s.block[new_n_iov] = vc->write_list;
- vc->write_list = consume_IOBufferBlockList(vc->write_list, (int) s.msg.descriptor[i].length);
- vc->write_list_bytes -= (int) s.msg.descriptor[i].length;
- vc->write_bytes_in_transit += (int) s.msg.descriptor[i].length;
+ vc->write_list = consume_IOBufferBlockList(vc->write_list, (int)s.msg.descriptor[i].length);
+ vc->write_list_bytes -= (int)s.msg.descriptor[i].length;
+ vc->write_bytes_in_transit += (int)s.msg.descriptor[i].length;
vc->write_list_tail = vc->write_list;
while (vc->write_list_tail && vc->write_list_tail->next)
@@ -680,8 +626,8 @@ bool ClusterHandler::build_initial_vector(bool read_flag)
s.n_iov = new_n_iov;
return true;
- // TODO: This is apparently dead code, I added the #if 0 to avoid compiler
- // warnings, but is this really intentional??
+// TODO: This is apparently dead code, I added the #if 0 to avoid compiler
+// warnings, but is this really intentional??
#if 0
// Release all IOBufferBlock references.
for (n = 0; n < MAX_TCOUNT; ++n) {
@@ -694,22 +640,23 @@ bool ClusterHandler::build_initial_vector(bool read_flag)
#endif
}
-bool ClusterHandler::get_read_locks()
+bool
+ClusterHandler::get_read_locks()
{
///////////////////////////////////////////////////////////////////////
// Reacquire locks for the request setup by build_initial_vector().
// We are called after each read completion prior to posting completion
///////////////////////////////////////////////////////////////////////
- ClusterState & s = read;
+ ClusterState &s = read;
int i, n;
int bytes_processed;
int vec_bytes_remainder;
int iov_done[MAX_TCOUNT];
- memset((char *) iov_done, 0, sizeof(int) * MAX_TCOUNT);
+ memset((char *)iov_done, 0, sizeof(int) * MAX_TCOUNT);
// Compute bytes transferred on a per vector basis
- bytes_processed = s.did - s.bytes_xfered; // not including bytes in this xfer
+ bytes_processed = s.did - s.bytes_xfered; // not including bytes in this xfer
i = -1;
for (n = 0; n < s.n_iov; ++n) {
@@ -719,7 +666,7 @@ bool ClusterHandler::get_read_locks()
} else {
iov_done[n] = s.iov[n].iov_len + bytes_processed;
if (i < 0) {
- i = n; // note i/o start vector
+ i = n; // note i/o start vector
// Now at vector where last transfer started,
// make considerations for the last transfer on this vector.
@@ -748,25 +695,21 @@ bool ClusterHandler::get_read_locks()
// the data descriptors.
for (; i < s.n_iov; ++i) {
- if ((s.msg.descriptor[i].type == CLUSTER_SEND_DATA)
- && (s.msg.descriptor[i].channel != CLUSTER_CONTROL_CHANNEL)) {
-
+ if ((s.msg.descriptor[i].type == CLUSTER_SEND_DATA) && (s.msg.descriptor[i].channel != CLUSTER_CONTROL_CHANNEL)) {
// Only user channels require locks
- ClusterVConnection *
- vc = channels[s.msg.descriptor[i].channel];
- if (!VALID_CHANNEL(vc) ||
- ((s.msg.descriptor[i].sequence_number) !=
- CLUSTER_SEQUENCE_NUMBER(vc->token.sequence_number)) || !vc_ok_read(vc)) {
+ ClusterVConnection *vc = channels[s.msg.descriptor[i].channel];
+ if (!VALID_CHANNEL(vc) || ((s.msg.descriptor[i].sequence_number) != CLUSTER_SEQUENCE_NUMBER(vc->token.sequence_number)) ||
+ !vc_ok_read(vc)) {
// Channel no longer valid, lock not needed since we
// already have a reference to the buffer
continue;
}
- ink_assert(!(ProxyMutex *) vc->read_locked);
+ ink_assert(!(ProxyMutex *)vc->read_locked);
vc->read_locked = vc->read.vio.mutex;
- if (vc->byte_bank_q.head
- || !MUTEX_TAKE_TRY_LOCK_FOR_SPIN(vc->read.vio.mutex, thread, vc->read.vio._cont, READ_LOCK_SPIN_COUNT)) {
+ if (vc->byte_bank_q.head ||
+ !MUTEX_TAKE_TRY_LOCK_FOR_SPIN(vc->read.vio.mutex, thread, vc->read.vio._cont, READ_LOCK_SPIN_COUNT)) {
// Pending byte bank completions or lock acquire failure.
vc->read_locked = NULL;
@@ -784,7 +727,7 @@ bool ClusterHandler::get_read_locks()
int64_t read_avail = vc->read_block->read_avail();
if (!vc->pending_remote_fill && read_avail) {
- Debug("cluster_vc_xfer", "Deferred fill ch %d %p %" PRId64" bytes", vc->channel, vc, read_avail);
+ Debug("cluster_vc_xfer", "Deferred fill ch %d %p %" PRId64 " bytes", vc->channel, vc, read_avail);
vc->read.vio.buffer.writer()->append_block(vc->read_block->clone());
if (complete_channel_read(read_avail, vc)) {
@@ -793,34 +736,31 @@ bool ClusterHandler::get_read_locks()
}
}
}
- return true; // success
+ return true; // success
}
-bool ClusterHandler::get_write_locks()
+bool
+ClusterHandler::get_write_locks()
{
///////////////////////////////////////////////////////////////////////
// Reacquire locks for the request setup by build_initial_vector().
// We are called after the entire write completes prior to
// posting completion.
///////////////////////////////////////////////////////////////////////
- ClusterState & s = write;
+ ClusterState &s = write;
int i;
for (i = 0; i < s.msg.count; ++i) {
- if ((s.msg.descriptor[i].type == CLUSTER_SEND_DATA)
- && (s.msg.descriptor[i].channel != CLUSTER_CONTROL_CHANNEL)) {
-
+ if ((s.msg.descriptor[i].type == CLUSTER_SEND_DATA) && (s.msg.descriptor[i].channel != CLUSTER_CONTROL_CHANNEL)) {
// Only user channels require locks
- ClusterVConnection *
- vc = channels[s.msg.descriptor[i].channel];
- if (!VALID_CHANNEL(vc) ||
- (s.msg.descriptor[i].sequence_number) != CLUSTER_SEQUENCE_NUMBER(vc->token.sequence_number)) {
+ ClusterVConnection *vc = channels[s.msg.descriptor[i].channel];
+ if (!VALID_CHANNEL(vc) || (s.msg.descriptor[i].sequence_number) != CLUSTER_SEQUENCE_NUMBER(vc->token.sequence_number)) {
// Channel no longer valid, lock not needed since we
// already have a reference to the buffer
continue;
}
- ink_assert(!(ProxyMutex *) vc->write_locked);
+ ink_assert(!(ProxyMutex *)vc->write_locked);
vc->write_locked = vc->write.vio.mutex;
#ifdef CLUSTER_TOMCAT
if (vc->write_locked &&
@@ -861,34 +801,34 @@ ClusterHandler::process_set_data_msgs()
// Process small control set_data messages.
/////////////////////////////////////////////
if (!read.msg.did_small_control_set_data) {
- char *p = (char *) &read.msg.descriptor[read.msg.count];
+ char *p = (char *)&read.msg.descriptor[read.msg.count];
char *endp = p + read.msg.control_bytes;
while (p < endp) {
if (needByteSwap) {
- ats_swap32((uint32_t *) p); // length
- ats_swap32((uint32_t *) (p + sizeof(int32_t))); // function code
+ ats_swap32((uint32_t *)p); // length
+ ats_swap32((uint32_t *)(p + sizeof(int32_t))); // function code
}
- int len = *(int32_t *) p;
- cluster_function_index = *(uint32_t *) (p + sizeof(int32_t));
+ int len = *(int32_t *)p;
+ cluster_function_index = *(uint32_t *)(p + sizeof(int32_t));
- if ((cluster_function_index < (uint32_t) SIZE_clusterFunction)
- && (cluster_function_index == SET_CHANNEL_DATA_CLUSTER_FUNCTION)) {
+ if ((cluster_function_index < (uint32_t)SIZE_clusterFunction) &&
+ (cluster_function_index == SET_CHANNEL_DATA_CLUSTER_FUNCTION)) {
clusterFunction[SET_CHANNEL_DATA_CLUSTER_FUNCTION].pfn(this, p + (2 * sizeof(uint32_t)), len - sizeof(uint32_t));
// Mark message as processed.
- *((uint32_t *) (p + sizeof(uint32_t))) = ~*((uint32_t *) (p + sizeof(uint32_t)));
+ *((uint32_t *)(p + sizeof(uint32_t))) = ~*((uint32_t *)(p + sizeof(uint32_t)));
p += (2 * sizeof(uint32_t)) + (len - sizeof(uint32_t));
- p = (char *) DOUBLE_ALIGN(p);
+ p = (char *)DOUBLE_ALIGN(p);
} else {
// Reverse swap since this message will be reprocessed.
if (needByteSwap) {
- ats_swap32((uint32_t *) p); // length
- ats_swap32((uint32_t *) (p + sizeof(int32_t))); // function code
+ ats_swap32((uint32_t *)p); // length
+ ats_swap32((uint32_t *)(p + sizeof(int32_t))); // function code
}
- break; // End of set_data messages
+ break; // End of set_data messages
}
}
- read.msg.control_data_offset = p - (char *) &read.msg.descriptor[read.msg.count];
+ read.msg.control_data_offset = p - (char *)&read.msg.descriptor[read.msg.count];
read.msg.did_small_control_set_data = 1;
}
/////////////////////////////////////////////
@@ -899,31 +839,29 @@ ClusterHandler::process_set_data_msgs()
while (ic) {
if (needByteSwap) {
- ats_swap32((uint32_t *) ic->data); // function code
+ ats_swap32((uint32_t *)ic->data); // function code
}
- cluster_function_index = *((uint32_t *) ic->data);
-
- if ((cluster_function_index < (uint32_t) SIZE_clusterFunction)
- && (cluster_function_index == SET_CHANNEL_DATA_CLUSTER_FUNCTION)) {
+ cluster_function_index = *((uint32_t *)ic->data);
+ if ((cluster_function_index < (uint32_t)SIZE_clusterFunction) &&
+ (cluster_function_index == SET_CHANNEL_DATA_CLUSTER_FUNCTION)) {
char *p = ic->data;
- clusterFunction[SET_CHANNEL_DATA_CLUSTER_FUNCTION].pfn(this,
- (void *) (p + sizeof(int32_t)), ic->len - sizeof(int32_t));
+ clusterFunction[SET_CHANNEL_DATA_CLUSTER_FUNCTION].pfn(this, (void *)(p + sizeof(int32_t)), ic->len - sizeof(int32_t));
// Reverse swap since this will be processed again for deallocation.
if (needByteSwap) {
- ats_swap32((uint32_t *) p); // length
- ats_swap32((uint32_t *) (p + sizeof(int32_t))); // function code
+ ats_swap32((uint32_t *)p); // length
+ ats_swap32((uint32_t *)(p + sizeof(int32_t))); // function code
}
// Mark message as processed.
// Defer dellocation until entire read is complete.
- *((uint32_t *) p) = ~*((uint32_t *) p);
+ *((uint32_t *)p) = ~*((uint32_t *)p);
- ic = (IncomingControl *) ic->link.next;
+ ic = (IncomingControl *)ic->link.next;
} else {
// Reverse swap action this message will be reprocessed.
if (needByteSwap) {
- ats_swap32((uint32_t *) ic->data); // function code
+ ats_swap32((uint32_t *)ic->data); // function code
}
break;
}
@@ -942,8 +880,8 @@ ClusterHandler::process_small_control_msgs()
}
ink_hrtime now = ink_get_hrtime();
- char *p = (char *) &read.msg.descriptor[read.msg.count] + read.msg.control_data_offset;
- char *endp = (char *) &read.msg.descriptor[read.msg.count] + read.msg.control_bytes;
+ char *p = (char *)&read.msg.descriptor[read.msg.count] + read.msg.control_data_offset;
+ char *endp = (char *)&read.msg.descriptor[read.msg.count] + read.msg.control_bytes;
while (p < endp) {
/////////////////////////////////////////////////////////////////
@@ -951,15 +889,15 @@ ClusterHandler::process_small_control_msgs()
// incoming queue for processing by callout threads.
/////////////////////////////////////////////////////////////////
if (needByteSwap) {
- ats_swap32((uint32_t *) p); // length
- ats_swap32((uint32_t *) (p + sizeof(int32_t))); // function code
+ ats_swap32((uint32_t *)p); // length
+ ats_swap32((uint32_t *)(p + sizeof(int32_t))); // function code
}
- int len = *(int32_t *) p;
+ int len = *(int32_t *)p;
p += sizeof(int32_t);
- uint32_t cluster_function_index = *(uint32_t *) p;
+ uint32_t cluster_function_index = *(uint32_t *)p;
ink_release_assert(cluster_function_index != SET_CHANNEL_DATA_CLUSTER_FUNCTION);
- if (cluster_function_index >= (uint32_t) SIZE_clusterFunction) {
+ if (cluster_function_index >= (uint32_t)SIZE_clusterFunction) {
Warning("1Bad cluster function index (small control)");
p += len;
@@ -980,11 +918,11 @@ ClusterHandler::process_small_control_msgs()
ic->len = len;
ic->alloc_data();
memcpy(ic->data, p, ic->len);
- SetHighBit(&ic->len); // mark as small cntl
- ink_atomiclist_push(&external_incoming_control, (void *) ic);
+ SetHighBit(&ic->len); // mark as small cntl
+ ink_atomiclist_push(&external_incoming_control, (void *)ic);
p += len;
}
- p = (char *) DOUBLE_ALIGN(p);
+ p = (char *)DOUBLE_ALIGN(p);
}
}
@@ -1006,12 +944,12 @@ ClusterHandler::process_large_control_msgs()
while ((ic = incoming_control.dequeue())) {
if (needByteSwap) {
- ats_swap32((uint32_t *) ic->data); // function code
+ ats_swap32((uint32_t *)ic->data); // function code
}
- cluster_function_index = *((uint32_t *) ic->data);
+ cluster_function_index = *((uint32_t *)ic->data);
ink_release_assert(cluster_function_index != SET_CHANNEL_DATA_CLUSTER_FUNCTION);
- if (cluster_function_index == (uint32_t) ~ SET_CHANNEL_DATA_CLUSTER_FUNCTION) {
+ if (cluster_function_index == (uint32_t)~SET_CHANNEL_DATA_CLUSTER_FUNCTION) {
// SET_CHANNEL_DATA_CLUSTER_FUNCTION already processed.
// Just do memory deallocation.
@@ -1020,14 +958,13 @@ ClusterHandler::process_large_control_msgs()
continue;
}
- if (cluster_function_index >= (uint32_t) SIZE_clusterFunction) {
+ if (cluster_function_index >= (uint32_t)SIZE_clusterFunction) {
Warning("Bad cluster function index (large control)");
ic->freeall();
} else if (clusterFunction[cluster_function_index].ClusterFunc) {
// Cluster message, process in ET_CLUSTER thread
- clusterFunction[cluster_function_index].pfn(this, (void *) (ic->data + sizeof(int32_t)),
- ic->len - sizeof(int32_t));
+ clusterFunction[cluster_function_index].pfn(this, (void *)(ic->data + sizeof(int32_t)), ic->len - sizeof(int32_t));
// Deallocate memory
if (!clusterFunction[cluster_function_index].fMalloced)
@@ -1035,7 +972,7 @@ ClusterHandler::process_large_control_msgs()
} else {
// Non Cluster message, process in non ET_CLUSTER thread
- ink_atomiclist_push(&external_incoming_control, (void *) ic);
+ ink_atomiclist_push(&external_incoming_control, (void *)ic);
}
}
}
@@ -1072,7 +1009,7 @@ ClusterHandler::process_freespace_msgs()
}
void
-ClusterHandler::add_to_byte_bank(ClusterVConnection * vc)
+ClusterHandler::add_to_byte_bank(ClusterVConnection *vc)
{
ByteBankDescriptor *bb_desc = ByteBankDescriptor::ByteBankDescriptor_alloc(vc->read_block);
bool pending_byte_bank_completion = vc->byte_bank_q.head ? true : false;
@@ -1110,31 +1047,29 @@ ClusterHandler::update_channels_read()
for (i = 0; i < read.msg.count; i++) {
if (read.msg.descriptor[i].type == CLUSTER_SEND_DATA && read.msg.descriptor[i].channel != CLUSTER_CONTROL_CHANNEL) {
ClusterVConnection *vc = channels[read.msg.descriptor[i].channel];
- if (VALID_CHANNEL(vc) &&
- (read.msg.descriptor[i].sequence_number) == CLUSTER_SEQUENCE_NUMBER(vc->token.sequence_number)) {
- vc->last_activity_time = current_time; // note activity time
+ if (VALID_CHANNEL(vc) && (read.msg.descriptor[i].sequence_number) == CLUSTER_SEQUENCE_NUMBER(vc->token.sequence_number)) {
+ vc->last_activity_time = current_time; // note activity time
len = read.msg.descriptor[i].length;
if (!len) {
continue;
}
- if (!vc->pending_remote_fill && vc_ok_read(vc)
- && (!((ProxyMutex *) vc->read_locked) || vc->byte_bank_q.head)) {
+ if (!vc->pending_remote_fill && vc_ok_read(vc) && (!((ProxyMutex *)vc->read_locked) || vc->byte_bank_q.head)) {
//
// Byte bank active or unable to acquire lock on VC.
// Move data into the byte bank and attempt delivery
// at the next periodic event.
//
- vc->read_block->fill(len); // note bytes received
+ vc->read_block->fill(len); // note bytes received
add_to_byte_bank(vc);
} else {
- if (vc->pending_remote_fill || ((ProxyMutex *) vc->read_locked && vc_ok_read(vc))) {
- vc->read_block->fill(len); // note bytes received
+ if (vc->pending_remote_fill || ((ProxyMutex *)vc->read_locked && vc_ok_read(vc))) {
+ vc->read_block->fill(len); // note bytes received
if (!vc->pending_remote_fill) {
vc->read.vio.buffer.writer()->append_block(vc->read_block->clone());
- vc->read_block->consume(len); // note bytes moved to user
+ vc->read_block->consume(len); // note bytes moved to user
}
complete_channel_read(len, vc);
}
@@ -1156,7 +1091,7 @@ ClusterHandler::update_channels_read()
// for message processing which cannot be done with a ET_CLUSTER thread.
//
int
-ClusterHandler::process_incoming_callouts(ProxyMutex * m)
+ClusterHandler::process_incoming_callouts(ProxyMutex *m)
{
ProxyMutex *mutex = m;
ink_hrtime now;
@@ -1170,13 +1105,12 @@ ClusterHandler::process_incoming_callouts(ProxyMutex * m)
IncomingControl *ic_ext;
while (true) {
- ic_ext = (IncomingControl *)
- ink_atomiclist_popall(&external_incoming_control);
+ ic_ext = (IncomingControl *)ink_atomiclist_popall(&external_incoming_control);
if (!ic_ext)
break;
while (ic_ext) {
- ic_ext_next = (IncomingControl *) ic_ext->link.next;
+ ic_ext_next = (IncomingControl *)ic_ext->link.next;
ic_ext->link.next = NULL;
local_incoming_control.push(ic_ext);
ic_ext = ic_ext_next;
@@ -1191,15 +1125,15 @@ ClusterHandler::process_incoming_callouts(ProxyMutex * m)
// Determine if this a small control message
small_control_msg = IsHighBitSet(&ic->len);
- ClearHighBit(&ic->len); // Clear small msg flag bit
+ ClearHighBit(&ic->len); // Clear small msg flag bit
if (small_control_msg) {
int len = ic->len;
char *p = ic->data;
- uint32_t cluster_function_index = *(uint32_t *) p;
+ uint32_t cluster_function_index = *(uint32_t *)p;
p += sizeof(uint32_t);
- if (cluster_function_index < (uint32_t) SIZE_clusterFunction) {
+ if (cluster_function_index < (uint32_t)SIZE_clusterFunction) {
////////////////////////////////
// Invoke processing function
////////////////////////////////
@@ -1216,17 +1150,16 @@ ClusterHandler::process_incoming_callouts(ProxyMutex * m)
} else {
ink_assert(ic->len > 4);
- uint32_t cluster_function_index = *(uint32_t *) ic->data;
+ uint32_t cluster_function_index = *(uint32_t *)ic->data;
bool valid_index;
- if (cluster_function_index < (uint32_t) SIZE_clusterFunction) {
+ if (cluster_function_index < (uint32_t)SIZE_clusterFunction) {
valid_index = true;
////////////////////////////////
// Invoke processing function
////////////////////////////////
ink_assert(!clusterFunction[cluster_function_index].ClusterFunc);
- clusterFunction[cluster_function_index].pfn(this, (void *) (ic->data + sizeof(int32_t)),
- ic->len - sizeof(int32_t));
+ clusterFunction[cluster_function_index].pfn(this, (void *)(ic->data + sizeof(int32_t)), ic->len - sizeof(int32_t));
now = ink_get_hrtime();
CLUSTER_SUM_DYN_STAT(CLUSTER_CTRL_MSGS_RECV_TIME_STAT, now - ic->recognized_time);
} else {
@@ -1270,7 +1203,7 @@ ClusterHandler::update_channels_partial_read()
if (already_read) {
already_read -= iov_done[i];
if (already_read < 0) {
- iov_done[i] = -already_read; // bytes remaining
+ iov_done[i] = -already_read; // bytes remaining
already_read = 0;
} else {
iov_done[i] = 0;
@@ -1295,10 +1228,9 @@ ClusterHandler::update_channels_partial_read()
for (i = 0; i < read.msg.count; i++) {
if (read.msg.descriptor[i].type == CLUSTER_SEND_DATA && read.msg.descriptor[i].channel != CLUSTER_CONTROL_CHANNEL) {
ClusterVConnection *vc = channels[read.msg.descriptor[i].channel];
- if (VALID_CHANNEL(vc) &&
- (read.msg.descriptor[i].sequence_number) == CLUSTER_SEQUENCE_NUMBER(vc->token.sequence_number)) {
+ if (VALID_CHANNEL(vc) && (read.msg.descriptor[i].sequence_number) == CLUSTER_SEQUENCE_NUMBER(vc->token.sequence_number)) {
if (vc->pending_remote_fill || (vc_ok_read(vc) && (vc->iov_map != CLUSTER_IOV_NONE))) {
- vc->last_activity_time = current_time; // note activity time
+ vc->last_activity_time = current_time; // note activity time
ClusterVConnState *s = &vc->read;
ink_assert(vc->iov_map < read.n_iov);
int len = iov_done[vc->iov_map];
@@ -1318,10 +1250,10 @@ ClusterHandler::update_channels_partial_read()
read_all_large_control_msgs = 1;
}
iov_done[vc->iov_map] = 0;
- vc->read_block->fill(len); // note bytes received
+ vc->read_block->fill(len); // note bytes received
if (!vc->pending_remote_fill) {
- if ((ProxyMutex *) vc->read_locked) {
+ if ((ProxyMutex *)vc->read_locked) {
Debug("cluster_vc_xfer", "Partial read, credit ch %d %p %d bytes", vc->channel, vc, len);
s->vio.buffer.writer()->append_block(vc->read_block->clone());
if (complete_channel_read(len, vc)) {
@@ -1333,7 +1265,7 @@ ClusterHandler::update_channels_partial_read()
// into the byte bank. Otherwise, do nothing since
// we will resume the read at this VC.
- if (len == (int) read.msg.descriptor[i].length) {
+ if (len == (int)read.msg.descriptor[i].length) {
Debug("cluster_vc_xfer", "Partial read, byte bank move ch %d %p %d bytes", vc->channel, vc, len);
add_to_byte_bank(vc);
}
@@ -1343,7 +1275,7 @@ ClusterHandler::update_channels_partial_read()
complete_channel_read(len, vc);
}
read.msg.descriptor[i].length -= len;
- ink_assert(((int) read.msg.descriptor[i].length) >= 0);
+ ink_assert(((int)read.msg.descriptor[i].length) >= 0);
}
Debug(CL_TRACE, "partial_channel_read chan=%d len=%d", vc->channel, len);
}
@@ -1352,7 +1284,8 @@ ClusterHandler::update_channels_partial_read()
}
}
-bool ClusterHandler::complete_channel_read(int len, ClusterVConnection * vc)
+bool
+ClusterHandler::complete_channel_read(int len, ClusterVConnection *vc)
{
//
// We have processed a complete VC read request message for a channel,
@@ -1363,22 +1296,21 @@ bool ClusterHandler::complete_channel_read(int len, ClusterVConnection * vc)
if (vc->pending_remote_fill) {
Debug(CL_TRACE, "complete_channel_read chan=%d len=%d", vc->channel, len);
vc->initial_data_bytes += len;
- ++vc->pending_remote_fill; // Note completion
+ ++vc->pending_remote_fill; // Note completion
return (vc->closed ? false : true);
}
if (vc->closed)
- return false; // No action if already closed
+ return false; // No action if already closed
- ink_assert((ProxyMutex *) s->vio.mutex == (ProxyMutex *) s->vio._cont->mutex);
+ ink_assert((ProxyMutex *)s->vio.mutex == (ProxyMutex *)s->vio._cont->mutex);
Debug("cluster_vc_xfer", "Complete read, credit ch %d %p %d bytes", vc->channel, vc, len);
s->vio.ndone += len;
if (s->vio.ntodo() <= 0) {
s->enabled = 0;
- if (cluster_signal_and_update_locked(VC_EVENT_READ_COMPLETE, vc, s)
- == EVENT_DONE)
+ if (cluster_signal_and_update_locked(VC_EVENT_READ_COMPLETE, vc, s) == EVENT_DONE)
return false;
} else {
if (cluster_signal_and_update_locked(VC_EVENT_READ_READY, vc, s) == EVENT_DONE)
@@ -1401,7 +1333,7 @@ ClusterHandler::finish_delayed_reads()
//
ClusterVConnection *vc = NULL;
DLL<ClusterVConnectionBase> l;
- while ((vc = (ClusterVConnection *) delayed_reads.pop())) {
+ while ((vc = (ClusterVConnection *)delayed_reads.pop())) {
MUTEX_TRY_LOCK_SPIN(lock, vc->read.vio.mutex, thread, READ_LOCK_SPIN_COUNT);
if (lock.is_locked()) {
if (vc_ok_read(vc)) {
@@ -1414,8 +1346,8 @@ ClusterHandler::finish_delayed_reads()
// remove our self to process another byte bank completion
ClusterVC_remove_read(vc);
}
- Debug("cluster_vc_xfer",
- "Delayed read, credit ch %d %p %" PRId64" bytes", vc->channel, vc, d->get_block()->read_avail());
+ Debug("cluster_vc_xfer", "Delayed read, credit ch %d %p %" PRId64 " bytes", vc->channel, vc,
+ d->get_block()->read_avail());
vc->read.vio.buffer.writer()->append_block(d->get_block());
if (complete_channel_read(d->get_block()->read_avail(), vc)) {
@@ -1447,17 +1379,13 @@ ClusterHandler::update_channels_written()
if (write.msg.descriptor[i].type == CLUSTER_SEND_DATA) {
if (write.msg.descriptor[i].channel != CLUSTER_CONTROL_CHANNEL) {
ClusterVConnection *vc = channels[write.msg.descriptor[i].channel];
- if (VALID_CHANNEL(vc) &&
- (write.msg.descriptor[i].sequence_number) == CLUSTER_SEQUENCE_NUMBER(vc->token.sequence_number)) {
-
+ if (VALID_CHANNEL(vc) && (write.msg.descriptor[i].sequence_number) == CLUSTER_SEQUENCE_NUMBER(vc->token.sequence_number)) {
if (vc->pending_remote_fill) {
- Debug(CL_TRACE,
- "update_channels_written chan=%d seqno=%d len=%d",
- write.msg.descriptor[i].channel,
+ Debug(CL_TRACE, "update_channels_written chan=%d seqno=%d len=%d", write.msg.descriptor[i].channel,
write.msg.descriptor[i].sequence_number, write.msg.descriptor[i].length);
vc->pending_remote_fill = 0;
vc->remote_write_block = 0; // free data block
- continue; // ignore remote write fill VC(s)
+ continue; // ignore remote write fill VC(s)
}
ClusterVConnState *s = &vc->write;
@@ -1467,8 +1395,8 @@ ClusterHandler::update_channels_written()
Debug(CL_PROTO, "(%d) data sent %d %" PRId64, write.msg.descriptor[i].channel, len, s->vio.ndone);
if (vc_ok_write(vc)) {
- vc->last_activity_time = current_time; // note activity time
- int64_t ndone = vc->was_closed()? 0 : s->vio.ndone;
+ vc->last_activity_time = current_time; // note activity time
+ int64_t ndone = vc->was_closed() ? 0 : s->vio.ndone;
if (ndone < vc->remote_free) {
vcs_push(vc, VC_CLUSTER_WRITE);
@@ -1497,7 +1425,7 @@ ClusterHandler::update_channels_written()
invoke_remote_data_args *args;
OutgoingControl *hdr_oc;
while ((hdr_oc = write.msg.outgoing_callout.dequeue())) {
- args = (invoke_remote_data_args *) (hdr_oc->data + sizeof(int32_t));
+ args = (invoke_remote_data_args *)(hdr_oc->data + sizeof(int32_t));
ink_assert(args->magicno == invoke_remote_data_args::MagicNo);
// Free data descriptor
@@ -1524,7 +1452,7 @@ ClusterHandler::build_write_descriptors()
// write (struct iovec system maximum).
//
int count_bucket = cur_vcs;
- int tcount = write.msg.count + 2; // count + descriptor
+ int tcount = write.msg.count + 2; // count + descriptor
int write_descriptors_built = 0;
int valid;
int list_len = 0;
@@ -1536,7 +1464,7 @@ ClusterHandler::build_write_descriptors()
vc = (ClusterVConnection *)ink_atomiclist_popall(&write_vcs_ready);
while (vc) {
enter_exit(&cls_build_writes_entered, &cls_writes_exited);
- vc_next = (ClusterVConnection *) vc->ready_alink.next;
+ vc_next = (ClusterVConnection *)vc->ready_alink.next;
vc->ready_alink.next = NULL;
list_len++;
if (VC_CLUSTER_CLOSED == vc->type) {
@@ -1561,10 +1489,10 @@ ClusterHandler::build_write_descriptors()
}
tcount = write.msg.count + 2;
- vc_next = (ClusterVConnection *) write_vcs[count_bucket].head;
+ vc_next = (ClusterVConnection *)write_vcs[count_bucket].head;
while (vc_next) {
vc = vc_next;
- vc_next = (ClusterVConnection *) vc->write.link.next;
+ vc_next = (ClusterVConnection *)vc->write.link.next;
if (VC_CLUSTER_CLOSED == vc->type) {
vc->type = VC_NULL;
@@ -1579,10 +1507,8 @@ ClusterHandler::build_write_descriptors()
if (-1 == valid) {
vcs_push(vc, VC_CLUSTER_WRITE);
} else if (valid) {
- ink_assert(vc->write_locked); // Acquired in valid_for_data_write()
- if ((vc->remote_free > (vc->write.vio.ndone - vc->write_list_bytes))
- && channels[vc->channel] == vc) {
-
+ ink_assert(vc->write_locked); // Acquired in valid_for_data_write()
+ if ((vc->remote_free > (vc->write.vio.ndone - vc->write_list_bytes)) && channels[vc->channel] == vc) {
ink_assert(vc->write_list && vc->write_list_bytes);
int d = write.msg.count;
@@ -1630,7 +1556,7 @@ ClusterHandler::build_freespace_descriptors()
// in the list.
//
int count_bucket = cur_vcs;
- int tcount = write.msg.count + 2; // count + descriptor require 2 iovec(s)
+ int tcount = write.msg.count + 2; // count + descriptor require 2 iovec(s)
int freespace_descriptors_built = 0;
int s = 0;
int list_len = 0;
@@ -1642,7 +1568,7 @@ ClusterHandler::build_freespace_descriptors()
vc = (ClusterVConnection *)ink_atomiclist_popall(&read_vcs_ready);
while (vc) {
enter_exit(&cls_build_reads_entered, &cls_reads_exited);
- vc_next = (ClusterVConnection *) vc->ready_alink.next;
+ vc_next = (ClusterVConnection *)vc->ready_alink.next;
vc->ready_alink.next = NULL;
list_len++;
if (VC_CLUSTER_CLOSED == vc->type) {
@@ -1667,10 +1593,10 @@ ClusterHandler::build_freespace_descriptors()
}
tcount = write.msg.count + 2;
- vc_next = (ClusterVConnection *) read_vcs[count_bucket].head;
+ vc_next = (ClusterVConnection *)read_vcs[count_bucket].head;
while (vc_next) {
vc = vc_next;
- vc_next = (ClusterVConnection *) vc->read.link.next;
+ vc_next = (ClusterVConnection *)vc->read.link.next;
if (VC_CLUSTER_CLOSED == vc->type) {
vc->type = VC_NULL;
@@ -1714,9 +1640,9 @@ ClusterHandler::build_controlmsg_descriptors()
// write (struct iovec system maximum) and for elements already
// in the list.
//
- int tcount = write.msg.count + 2; // count + descriptor require 2 iovec(s)
+ int tcount = write.msg.count + 2; // count + descriptor require 2 iovec(s)
int control_msgs_built = 0;
- bool compound_msg; // msg + chan data
+ bool compound_msg; // msg + chan data
//
// Build descriptors for control messages
//
@@ -1724,12 +1650,12 @@ ClusterHandler::build_controlmsg_descriptors()
int control_bytes = 0;
int q = 0;
- while (tcount < (MAX_TCOUNT - 1)) { // -1 to allow for compound messages
+ while (tcount < (MAX_TCOUNT - 1)) { // -1 to allow for compound messages
c = outgoing_control[q].pop();
if (!c) {
// Move elements from global outgoing_control to local queue
OutgoingControl *c_next;
- c = (OutgoingControl *) ink_atomiclist_popall(&outgoing_control_al[q]);
+ c = (OutgoingControl *)ink_atomiclist_popall(&outgoing_control_al[q]);
if (c == 0) {
if (++q >= CLUSTER_CMSG_QUEUES) {
break;
@@ -1738,7 +1664,7 @@ ClusterHandler::build_controlmsg_descriptors()
}
}
while (c) {
- c_next = (OutgoingControl *) c->link.next;
+ c_next = (OutgoingControl *)c->link.next;
c->link.next = NULL;
outgoing_control[q].push(c);
c = c_next;
@@ -1746,17 +1672,17 @@ ClusterHandler::build_controlmsg_descriptors()
continue;
} else {
- compound_msg = (*((int32_t *) c->data) == -1); // (msg+chan data)?
+ compound_msg = (*((int32_t *)c->data) == -1); // (msg+chan data)?
}
if (!compound_msg && c->len <= SMALL_CONTROL_MESSAGE &&
// check if the receiving cluster function will want to malloc'ed data
- !clusterFunction[*(int32_t *) c->data].fMalloced && control_bytes + c->len + sizeof(int32_t) * 2 + 7 < CONTROL_DATA) {
+ !clusterFunction[*(int32_t *)c->data].fMalloced && control_bytes + c->len + sizeof(int32_t) * 2 + 7 < CONTROL_DATA) {
write.msg.outgoing_small_control.enqueue(c);
- control_bytes += c->len + sizeof(int32_t) * 2 + 7; // safe approximation
+ control_bytes += c->len + sizeof(int32_t) * 2 + 7; // safe approximation
control_msgs_built++;
- if (clusterFunction[*(int32_t *) c->data].post_pfn) {
- clusterFunction[*(int32_t *) c->data].post_pfn(this, c->data + sizeof(int32_t), c->len);
+ if (clusterFunction[*(int32_t *)c->data].post_pfn) {
+ clusterFunction[*(int32_t *)c->data].post_pfn(this, c->data + sizeof(int32_t), c->len);
}
continue;
}
@@ -1765,7 +1691,7 @@ ClusterHandler::build_controlmsg_descriptors()
//
if (compound_msg) {
// Extract out components of compound message.
- invoke_remote_data_args *cmhdr = (invoke_remote_data_args *) (c->data + sizeof(int32_t));
+ invoke_remote_data_args *cmhdr = (invoke_remote_data_args *)(c->data + sizeof(int32_t));
OutgoingControl *oc_header = c;
OutgoingControl *oc_msg = cmhdr->msg_oc;
OutgoingControl *oc_data = cmhdr->data_oc;
@@ -1831,7 +1757,7 @@ ClusterHandler::build_controlmsg_descriptors()
oc_msg->freeall();
// Free data descriptor
- oc_data->free_data(); // invoke memory free callback
+ oc_data->free_data(); // invoke memory free callback
oc_data->mutex = 0;
oc_data->freeall();
}
@@ -1852,8 +1778,8 @@ ClusterHandler::build_controlmsg_descriptors()
tcount++;
control_msgs_built++;
- if (clusterFunction[*(int32_t *) c->data].post_pfn) {
- clusterFunction[*(int32_t *) c->data].post_pfn(this, c->data + sizeof(int32_t), c->len);
+ if (clusterFunction[*(int32_t *)c->data].post_pfn) {
+ clusterFunction[*(int32_t *)c->data].post_pfn(this, c->data + sizeof(int32_t), c->len);
}
}
}
@@ -1866,11 +1792,11 @@ ClusterHandler::add_small_controlmsg_descriptors()
//
// Move small control message data to free space after descriptors
//
- char *p = (char *) &write.msg.descriptor[write.msg.count];
+ char *p = (char *)&write.msg.descriptor[write.msg.count];
OutgoingControl *c = NULL;
while ((c = write.msg.outgoing_small_control.dequeue())) {
- *(int32_t *) p = c->len;
+ *(int32_t *)p = c->len;
p += sizeof(int32_t);
memcpy(p, c->data, c->len);
c->free_data();
@@ -1880,9 +1806,9 @@ ClusterHandler::add_small_controlmsg_descriptors()
CLUSTER_SUM_DYN_STAT(CLUSTER_CTRL_MSGS_SEND_TIME_STAT, now - c->submit_time);
LOG_EVENT_TIME(c->submit_time, cluster_send_time_dist, cluster_send_events);
c->freeall();
- p = (char *) DOUBLE_ALIGN(p);
+ p = (char *)DOUBLE_ALIGN(p);
}
- write.msg.control_bytes = p - (char *) &write.msg.descriptor[write.msg.count];
+ write.msg.control_bytes = p - (char *)&write.msg.descriptor[write.msg.count];
#ifdef CLUSTER_STATS
_control_write_bytes += write.msg.control_bytes;
@@ -1891,14 +1817,13 @@ ClusterHandler::add_small_controlmsg_descriptors()
return 1;
}
-struct DestructorLock
-{
- DestructorLock(EThread * thread)
+struct DestructorLock {
+ DestructorLock(EThread *thread)
{
have_lock = false;
t = thread;
}
- ~DestructorLock()
+ ~DestructorLock()
{
if (have_lock && m) {
Mutex_unlock(m, t);
@@ -1911,7 +1836,7 @@ struct DestructorLock
};
int
-ClusterHandler::valid_for_data_write(ClusterVConnection * vc)
+ClusterHandler::valid_for_data_write(ClusterVConnection *vc)
{
//
// Determine if writes are allowed on this VC
@@ -1919,7 +1844,7 @@ ClusterHandler::valid_for_data_write(ClusterVConnection * vc)
ClusterVConnState *s = &vc->write;
ink_assert(!on_stolen_thread);
- ink_assert((ProxyMutex *) ! vc->write_locked);
+ ink_assert((ProxyMutex *)!vc->write_locked);
//
// Attempt to get the lock, if we miss, push vc into the future
@@ -1977,7 +1902,7 @@ retry:
if (!lock.have_lock && s->vio.mutex && s->vio._cont) {
goto retry;
} else {
- // No active VIO
+// No active VIO
#ifdef CLUSTER_STATS
_dw_no_active_vio++;
#endif
@@ -2025,7 +1950,7 @@ retry:
//
// Calculate amount writable
//
- MIOBufferAccessor & buf = s->vio.buffer;
+ MIOBufferAccessor &buf = s->vio.buffer;
int64_t towrite = buf.reader()->read_avail();
int64_t ntodo = s->vio.ntodo();
@@ -2061,8 +1986,7 @@ retry:
if (towrite && bytes_to_fill) {
consume_bytes = (towrite > bytes_to_fill) ? bytes_to_fill : towrite;
- b_list = clone_IOBufferBlockList(s->vio.buffer.reader()->block,
- s->vio.buffer.reader()->start_offset, consume_bytes, &b_tail);
+ b_list = clone_IOBufferBlockList(s->vio.buffer.reader()->block, s->vio.buffer.reader()->start_offset, consume_bytes, &b_tail);
ink_assert(b_tail);
// Append cloned IOBufferBlock list to VC write_list.
@@ -2095,13 +2019,13 @@ retry:
return 1;
} else {
if (!write_vc_signal && buf.writer()->write_avail() && towrite != ntodo)
- cluster_signal_and_update(VC_EVENT_WRITE_READY, vc, s);
+ cluster_signal_and_update(VC_EVENT_WRITE_READY, vc, s);
return 0;
}
}
int
-ClusterHandler::valid_for_freespace_write(ClusterVConnection * vc)
+ClusterHandler::valid_for_freespace_write(ClusterVConnection *vc)
{
//
// Determine if freespace messages are allowed on this VC
@@ -2156,7 +2080,7 @@ retry:
if (!lock.have_lock && s->vio.mutex && s->vio._cont) {
goto retry;
} else {
- // No active VIO
+// No active VIO
#ifdef CLUSTER_STATS
_fw_no_active_vio++;
#endif
@@ -2194,7 +2118,6 @@ retry:
int64_t bytes_to_move = vc->initial_data_bytes;
if (vc->read_block && bytes_to_move) {
-
// Push initial read data into VC
if (ntodo >= bytes_to_move) {
@@ -2231,8 +2154,7 @@ retry:
return 0;
}
}
- if (cluster_signal_and_update_locked(VC_EVENT_READ_READY, vc, s)
- == EVENT_DONE)
+ if (cluster_signal_and_update_locked(VC_EVENT_READ_READY, vc, s) == EVENT_DONE)
return false;
if (s->vio.ntodo() <= 0)
@@ -2269,7 +2191,7 @@ retry:
}
void
-ClusterHandler::vcs_push(ClusterVConnection * vc, int type)
+ClusterHandler::vcs_push(ClusterVConnection *vc, int type)
{
if (vc->type <= VC_CLUSTER)
vc->type = type;
@@ -2284,7 +2206,7 @@ ClusterHandler::vcs_push(ClusterVConnection * vc, int type)
}
int
-ClusterHandler::remote_close(ClusterVConnection * vc, ClusterVConnState * ns)
+ClusterHandler::remote_close(ClusterVConnection *vc, ClusterVConnState *ns)
{
if (ns->vio.op != VIO::NONE && !vc->closed) {
ns->enabled = 0;
@@ -2306,17 +2228,17 @@ ClusterHandler::remote_close(ClusterVConnection * vc, ClusterVConnState * ns)
}
void
-ClusterHandler::steal_thread(EThread * t)
+ClusterHandler::steal_thread(EThread *t)
{
//
// Attempt to push the control message now instead of waiting
// for the periodic event to process it.
//
- if (t != thread && // different thread to steal
- write.to_do <= 0 && // currently not trying to send data
+ if (t != thread && // different thread to steal
+ write.to_do <= 0 && // currently not trying to send data
// nothing big outstanding
!write.msg.count) {
- mainClusterEvent(CLUSTER_EVENT_STEAL_THREAD, (Event *) t);
+ mainClusterEvent(CLUSTER_EVENT_STEAL_THREAD, (Event *)t);
}
}
@@ -2333,29 +2255,28 @@ ClusterHandler::free_locks(bool read_flag, int i)
i = write.msg.count;
}
}
- ClusterState & s = (read_flag ? read : write);
+ ClusterState &s = (read_flag ? read : write);
for (int j = 0; j < i; j++) {
if (s.msg.descriptor[j].type == CLUSTER_SEND_DATA && s.msg.descriptor[j].channel != CLUSTER_CONTROL_CHANNEL) {
ClusterVConnection *vc = channels[s.msg.descriptor[j].channel];
if (VALID_CHANNEL(vc)) {
if (read_flag) {
- if ((ProxyMutex *) vc->read_locked) {
+ if ((ProxyMutex *)vc->read_locked) {
MUTEX_UNTAKE_LOCK(vc->read.vio.mutex, thread);
vc->read_locked = NULL;
}
} else {
- if ((ProxyMutex *) vc->write_locked) {
+ if ((ProxyMutex *)vc->write_locked) {
MUTEX_UNTAKE_LOCK(vc->write_locked, thread);
vc->write_locked = NULL;
}
}
}
- } else if (!read_flag &&
- s.msg.descriptor[j].type == CLUSTER_SEND_FREE &&
+ } else if (!read_flag && s.msg.descriptor[j].type == CLUSTER_SEND_FREE &&
s.msg.descriptor[j].channel != CLUSTER_CONTROL_CHANNEL) {
ClusterVConnection *vc = channels[s.msg.descriptor[j].channel];
if (VALID_CHANNEL(vc)) {
- if ((ProxyMutex *) vc->read_locked) {
+ if ((ProxyMutex *)vc->read_locked) {
MUTEX_UNTAKE_LOCK(vc->read_locked, thread);
vc->read_locked = NULL;
}
@@ -2401,7 +2322,7 @@ extern int CacheClusterMonitorIntervalSecs;
// The main event for machine-machine link
//
int
-ClusterHandler::mainClusterEvent(int event, Event * e)
+ClusterHandler::mainClusterEvent(int event, Event *e)
{
// Set global time
current_time = ink_get_hrtime();
@@ -2412,15 +2333,15 @@ ClusterHandler::mainClusterEvent(int event, Event * e)
dump_internal_data();
}
}
- //
- // Note: The caller always acquires the ClusterHandler mutex prior
- // to the call. This guarantees single threaded access in
- // mainClusterEvent()
- //
+//
+// Note: The caller always acquires the ClusterHandler mutex prior
+// to the call. This guarantees single threaded access in
+// mainClusterEvent()
+//
- /////////////////////////////////////////////////////////////////////////
- // If cluster interconnect is overloaded, disable remote cluster ops.
- /////////////////////////////////////////////////////////////////////////
+/////////////////////////////////////////////////////////////////////////
+// If cluster interconnect is overloaded, disable remote cluster ops.
+/////////////////////////////////////////////////////////////////////////
#ifndef DEBUG
if (clm && ClusterLoadMonitor::cf_monitor_enabled > 0) {
#else
@@ -2446,7 +2367,7 @@ ClusterHandler::mainClusterEvent(int event, Event * e)
bool io_callback = (event == EVENT_IMMEDIATE);
if (on_stolen_thread) {
- thread = (EThread *) e;
+ thread = (EThread *)e;
} else {
if (io_callback) {
thread = this_ethread();
@@ -2499,7 +2420,7 @@ ClusterHandler::mainClusterEvent(int event, Event * e)
/////////////////////////////////////////
if (!on_stolen_thread) {
if (do_open_local_requests())
- thread->signal_hook(thread);
+ thread->signal_hook(thread);
}
}
@@ -2513,8 +2434,7 @@ ClusterHandler::mainClusterEvent(int event, Event * e)
return EVENT_CONT;
}
-int
-ClusterHandler::process_read(ink_hrtime /* now ATS_UNUSED */)
+int ClusterHandler::process_read(ink_hrtime /* now ATS_UNUSED */)
{
#ifdef CLUSTER_STATS
_process_read_calls++;
@@ -2528,9 +2448,8 @@ ClusterHandler::process_read(ink_hrtime /* now ATS_UNUSED */)
///////////////////////////////
for (;;) {
-
switch (read.state) {
- ///////////////////////////////////////////////
+ ///////////////////////////////////////////////
case ClusterState::READ_START:
///////////////////////////////////////////////
{
@@ -2546,7 +2465,7 @@ ClusterHandler::process_read(ink_hrtime /* now ATS_UNUSED */)
}
break;
}
- ///////////////////////////////////////////////
+ ///////////////////////////////////////////////
case ClusterState::READ_HEADER:
///////////////////////////////////////////////
{
@@ -2561,7 +2480,7 @@ ClusterHandler::process_read(ink_hrtime /* now ATS_UNUSED */)
}
break;
}
- ///////////////////////////////////////////////
+ ///////////////////////////////////////////////
case ClusterState::READ_AWAIT_HEADER:
///////////////////////////////////////////////
{
@@ -2589,11 +2508,9 @@ ClusterHandler::process_read(ink_hrtime /* now ATS_UNUSED */)
}
} else {
#ifdef MSG_TRACE
- fprintf(t_fd,
- "[R] seqno=%d count=%d control_bytes=%d count_check=%d dsum=%d csum=%d\n",
- read.sequence_number,
- read.msg.hdr()->count, read.msg.hdr()->control_bytes,
- read.msg.hdr()->count_check, read.msg.hdr()->descriptor_cksum, read.msg.hdr()->control_bytes_cksum);
+ fprintf(t_fd, "[R] seqno=%d count=%d control_bytes=%d count_check=%d dsum=%d csum=%d\n", read.sequence_number,
+ read.msg.hdr()->count, read.msg.hdr()->control_bytes, read.msg.hdr()->count_check,
+ read.msg.hdr()->descriptor_cksum, read.msg.hdr()->control_bytes_cksum);
fflush(t_fd);
#endif
CLUSTER_SUM_DYN_STAT(CLUSTER_READ_BYTES_STAT, read.did);
@@ -2623,7 +2540,7 @@ ClusterHandler::process_read(ink_hrtime /* now ATS_UNUSED */)
break;
}
}
- ///////////////////////////////////////////////
+ ///////////////////////////////////////////////
case ClusterState::READ_SETUP_DESCRIPTOR:
///////////////////////////////////////////////
{
@@ -2637,7 +2554,7 @@ ClusterHandler::process_read(ink_hrtime /* now ATS_UNUSED */)
}
break;
}
- ///////////////////////////////////////////////
+ ///////////////////////////////////////////////
case ClusterState::READ_DESCRIPTOR:
///////////////////////////////////////////////
{
@@ -2652,7 +2569,7 @@ ClusterHandler::process_read(ink_hrtime /* now ATS_UNUSED */)
}
break;
}
- ///////////////////////////////////////////////
+ ///////////////////////////////////////////////
case ClusterState::READ_AWAIT_DESCRIPTOR:
///////////////////////////////////////////////
{
@@ -2698,7 +2615,7 @@ ClusterHandler::process_read(ink_hrtime /* now ATS_UNUSED */)
break;
}
}
- ///////////////////////////////////////////////
+ ///////////////////////////////////////////////
case ClusterState::READ_SETUP_DATA:
///////////////////////////////////////////////
{
@@ -2718,7 +2635,7 @@ ClusterHandler::process_read(ink_hrtime /* now ATS_UNUSED */)
}
break;
}
- ///////////////////////////////////////////////
+ ///////////////////////////////////////////////
case ClusterState::READ_DATA:
///////////////////////////////////////////////
{
@@ -2734,7 +2651,7 @@ ClusterHandler::process_read(ink_hrtime /* now ATS_UNUSED */)
}
break;
}
- ///////////////////////////////////////////////
+ ///////////////////////////////////////////////
case ClusterState::READ_AWAIT_DATA:
///////////////////////////////////////////////
{
@@ -2742,7 +2659,7 @@ ClusterHandler::process_read(ink_hrtime /* now ATS_UNUSED */)
_n_read_await_data++;
#endif
if (!read.io_complete) {
- return 0; // awaiting i/o complete
+ return 0; // awaiting i/o complete
} else {
if (read.io_complete > 0) {
read.state = ClusterState::READ_POST_COMPLETE;
@@ -2754,7 +2671,7 @@ ClusterHandler::process_read(ink_hrtime /* now ATS_UNUSED */)
}
break;
}
- ///////////////////////////////////////////////
+ ///////////////////////////////////////////////
case ClusterState::READ_POST_COMPLETE:
///////////////////////////////////////////////
{
@@ -2784,7 +2701,7 @@ ClusterHandler::process_read(ink_hrtime /* now ATS_UNUSED */)
break;
}
}
- ///////////////////////////////////////////////
+ ///////////////////////////////////////////////
case ClusterState::READ_COMPLETE:
///////////////////////////////////////////////
{
@@ -2801,17 +2718,17 @@ ClusterHandler::process_read(ink_hrtime /* now ATS_UNUSED */)
free_locks(CLUSTER_READ);
read.state = ClusterState::READ_START;
- break; // setup next read
+ break; // setup next read
}
- //////////////////
+ //////////////////
default:
//////////////////
{
ink_release_assert(!"ClusterHandler::process_read invalid state");
}
- } // end of switch
- } // end of for
+ } // end of switch
+ } // end of for
}
int
@@ -2824,9 +2741,8 @@ ClusterHandler::process_write(ink_hrtime now, bool only_write_control_msgs)
// Cluster write state machine
/////////////////////////////////
for (;;) {
-
switch (write.state) {
- ///////////////////////////////////////////////
+ ///////////////////////////////////////////////
case ClusterState::WRITE_START:
///////////////////////////////////////////////
{
@@ -2842,7 +2758,7 @@ ClusterHandler::process_write(ink_hrtime now, bool only_write_control_msgs)
write.state = ClusterState::WRITE_SETUP;
break;
}
- ///////////////////////////////////////////////
+ ///////////////////////////////////////////////
case ClusterState::WRITE_SETUP:
///////////////////////////////////////////////
{
@@ -2867,7 +2783,7 @@ ClusterHandler::process_write(ink_hrtime now, bool only_write_control_msgs)
if (pw_freespace_descriptors_built) {
pw_freespace_descriptors_built = build_freespace_descriptors();
}
- add_small_controlmsg_descriptors(); // always last
+ add_small_controlmsg_descriptors(); // always last
} else {
/////////////////////////////////////////////////////////////
// Build a write descriptor only containing control data.
@@ -2875,7 +2791,7 @@ ClusterHandler::process_write(ink_hrtime now, bool only_write_control_msgs)
pw_write_descriptors_built = 0;
pw_freespace_descriptors_built = 0;
pw_controldata_descriptors_built = build_controlmsg_descriptors();
- add_small_controlmsg_descriptors(); // always last
+ add_small_controlmsg_descriptors(); // always last
}
// If nothing to write, post write completion
@@ -2887,7 +2803,7 @@ ClusterHandler::process_write(ink_hrtime now, bool only_write_control_msgs)
control_message_write = only_write_control_msgs;
}
- // Move required data into the message header
+// Move required data into the message header
#ifdef CLUSTER_MESSAGE_CKSUM
write.msg.descriptor_cksum = write.msg.calc_descriptor_cksum();
write.msg.hdr()->descriptor_cksum = write.msg.descriptor_cksum;
@@ -2905,7 +2821,7 @@ ClusterHandler::process_write(ink_hrtime now, bool only_write_control_msgs)
write.state = ClusterState::WRITE_INITIATE;
break;
}
- ///////////////////////////////////////////////
+ ///////////////////////////////////////////////
case ClusterState::WRITE_INITIATE:
///////////////////////////////////////////////
{
@@ -2920,7 +2836,7 @@ ClusterHandler::process_write(ink_hrtime now, bool only_write_control_msgs)
}
break;
}
- ///////////////////////////////////////////////
+ ///////////////////////////////////////////////
case ClusterState::WRITE_AWAIT_COMPLETION:
///////////////////////////////////////////////
{
@@ -2954,7 +2870,7 @@ ClusterHandler::process_write(ink_hrtime now, bool only_write_control_msgs)
}
break;
}
- ///////////////////////////////////////////////
+ ///////////////////////////////////////////////
case ClusterState::WRITE_POST_COMPLETE:
///////////////////////////////////////////////
{
@@ -2974,7 +2890,7 @@ ClusterHandler::process_write(ink_hrtime now, bool only_write_control_msgs)
write.state = ClusterState::WRITE_COMPLETE;
break;
}
- ///////////////////////////////////////////////
+ ///////////////////////////////////////////////
case ClusterState::WRITE_COMPLETE:
///////////////////////////////////////////////
{
@@ -2990,8 +2906,8 @@ ClusterHandler::process_write(ink_hrtime now, bool only_write_control_msgs)
//
pw_time_expired = (curtime - now) > CLUSTER_MAX_RUN_TIME;
- if (!control_message_write && !pw_write_descriptors_built
- && !pw_freespace_descriptors_built && !pw_controldata_descriptors_built) {
+ if (!control_message_write && !pw_write_descriptors_built && !pw_freespace_descriptors_built &&
+ !pw_controldata_descriptors_built) {
// skip to the next bucket
cur_vcs = (cur_vcs + 1) % CLUSTER_BUCKETS;
}
@@ -3018,24 +2934,24 @@ ClusterHandler::process_write(ink_hrtime now, bool only_write_control_msgs)
}
}
if (pw_time_expired) {
- return -1; // thread run time expired
+ return -1; // thread run time expired
} else {
if (pw_write_descriptors_built || pw_freespace_descriptors_built || pw_controldata_descriptors_built) {
- break; // start another write
+ break; // start another write
} else {
- return 0; // no more data to write
+ return 0; // no more data to write
}
}
}
- //////////////////
+ //////////////////
default:
//////////////////
{
ink_release_assert(!"ClusterHandler::process_write invalid state");
}
- } // End of switch
- } // End of for
+ } // End of switch
+ } // End of for
}
int
@@ -3059,13 +2975,12 @@ ClusterHandler::do_open_local_requests()
// move them to the local working queue while maintaining insertion order.
//
while (true) {
- cvc_ext = (ClusterVConnection *)
- ink_atomiclist_popall(&external_incoming_open_local);
+ cvc_ext = (ClusterVConnection *)ink_atomiclist_popall(&external_incoming_open_local);
if (cvc_ext == 0)
break;
while (cvc_ext) {
- cvc_ext_next = (ClusterVConnection *) cvc_ext->link.next;
+ cvc_ext_next = (ClusterVConnection *)cvc_ext->link.next;
cvc_ext->link.next = NULL;
local_incoming_open_local.push(cvc_ext);
cvc_ext = cvc_ext_next;
@@ -3089,7 +3004,7 @@ ClusterHandler::do_open_local_requests()
// unable to get mutex, insert request back onto global queue.
Debug(CL_TRACE, "do_open_local_requests() unable to acquire mutex (cvc=%p)", cvc);
pending_request = 1;
- ink_atomiclist_push(&external_incoming_open_local, (void *) cvc);
+ ink_atomiclist_push(&external_incoming_open_local, (void *)cvc);
}
}
}