You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by zw...@apache.org on 2015/03/23 21:33:12 UTC
[39/52] [partial] trafficserver git commit: TS-3419 Fix some enum's
such that clang-format can handle it the way we want. Basically this means
having a trailing ,
on short enum's. TS-3419 Run clang-format over most of the source
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/65477944/iocore/cluster/ClusterCache.cc
----------------------------------------------------------------------
diff --git a/iocore/cluster/ClusterCache.cc b/iocore/cluster/ClusterCache.cc
index d6315d9..198d8e1 100644
--- a/iocore/cluster/ClusterCache.cc
+++ b/iocore/cluster/ClusterCache.cc
@@ -29,7 +29,7 @@
#include "P_Cluster.h"
#ifdef DEBUG
-#define CLUSTER_TEST_DEBUG 1
+#define CLUSTER_TEST_DEBUG 1
#endif
#ifdef ENABLE_TIME_TRACE
@@ -62,7 +62,7 @@ static Queue<CacheContinuation> remoteCacheContQueue[REMOTE_CONNECT_HASH];
static Ptr<ProxyMutex> remoteCacheContQueueMutex[REMOTE_CONNECT_HASH];
// 0 is an illegal sequence number
-#define CACHE_NO_RESPONSE 0
+#define CACHE_NO_RESPONSE 0
static int cluster_sequence_number = 1;
#ifdef CLUSTER_TEST_DEBUG
@@ -78,12 +78,11 @@ static CacheContinuation *find_cache_continuation(unsigned int, unsigned int);
static unsigned int new_cache_sequence_number();
-#define DOT_SEPARATED(_x) \
-((unsigned char*)&(_x))[0], ((unsigned char*)&(_x))[1], \
- ((unsigned char*)&(_x))[2], ((unsigned char*)&(_x))[3]
+#define DOT_SEPARATED(_x) \
+ ((unsigned char *)&(_x))[0], ((unsigned char *)&(_x))[1], ((unsigned char *)&(_x))[2], ((unsigned char *)&(_x))[3]
-#define ET_CACHE_CONT_SM ET_NET
-#define ALLOW_THREAD_STEAL true
+#define ET_CACHE_CONT_SM ET_NET
+#define ALLOW_THREAD_STEAL true
/**********************************************************************/
#ifdef CACHE_MSG_TRACE
@@ -93,9 +92,8 @@ static unsigned int new_cache_sequence_number();
// Debug trace support for cache RPC messages
/**********************************************************************/
-#define MAX_TENTRIES 4096
-struct traceEntry
-{
+#define MAX_TENTRIES 4096
+struct traceEntry {
unsigned int seqno;
int op;
char *type;
@@ -132,8 +130,8 @@ dump_recvtrace_table()
int n;
printf("\n");
for (n = 0; n < MAX_TENTRIES; ++n)
- printf("[%d] seqno=%d, op=%d type=%s\n", n, recvTraceTable[n].seqno,
- recvTraceTable[n].op, recvTraceTable[n].type ? recvTraceTable[n].type : "");
+ printf("[%d] seqno=%d, op=%d type=%s\n", n, recvTraceTable[n].seqno, recvTraceTable[n].op,
+ recvTraceTable[n].type ? recvTraceTable[n].type : "");
}
void
@@ -142,8 +140,8 @@ dump_sndtrace_table()
int n;
printf("\n");
for (n = 0; n < MAX_TENTRIES; ++n)
- printf("[%d] seqno=%d, op=%d type=%s\n", n, sndTraceTable[n].seqno,
- sndTraceTable[n].op, sndTraceTable[n].type ? sndTraceTable[n].type : "");
+ printf("[%d] seqno=%d, op=%d type=%s\n", n, sndTraceTable[n].seqno, sndTraceTable[n].op,
+ sndTraceTable[n].type ? sndTraceTable[n].type : "");
}
/**********************************************************************/
@@ -165,61 +163,50 @@ dump_sndtrace_table()
class ClusterVConnectionCache
{
public:
- ClusterVConnectionCache()
- {
- memset(hash_event, 0, sizeof(hash_event));
- }
+ ClusterVConnectionCache() { memset(hash_event, 0, sizeof(hash_event)); }
void init();
- int MD5ToIndex(INK_MD5 * p);
+ int MD5ToIndex(INK_MD5 *p);
int insert(INK_MD5 *, ClusterVConnection *);
ClusterVConnection *lookup(INK_MD5 *);
public:
- struct Entry
- {
+ struct Entry {
LINK(Entry, link);
bool mark_for_delete;
INK_MD5 key;
ClusterVConnection *vc;
- Entry():mark_for_delete(0), vc(0)
- {
- }
- ~Entry()
- {
- }
+ Entry() : mark_for_delete(0), vc(0) {}
+ ~Entry() {}
};
- enum
- { MAX_TABLE_ENTRIES = 256, // must be power of 2
- SCAN_INTERVAL = 10 // seconds
+ enum {
+ MAX_TABLE_ENTRIES = 256, // must be power of 2
+ SCAN_INTERVAL = 10 // seconds
};
Queue<Entry> hash_table[MAX_TABLE_ENTRIES];
Ptr<ProxyMutex> hash_lock[MAX_TABLE_ENTRIES];
Event *hash_event[MAX_TABLE_ENTRIES];
};
-static ClassAllocator <
- ClusterVConnectionCache::Entry >
-ClusterVCCacheEntryAlloc("ClusterVConnectionCache::Entry");
+static ClassAllocator<ClusterVConnectionCache::Entry> ClusterVCCacheEntryAlloc("ClusterVConnectionCache::Entry");
ClusterVConnectionCache *GlobalOpenWriteVCcache = 0;
/////////////////////////////////////////////////////////////////
// Perform periodic purges of ClusterVConnectionCache entries
/////////////////////////////////////////////////////////////////
-class ClusterVConnectionCacheEvent:public Continuation
+class ClusterVConnectionCacheEvent : public Continuation
{
public:
- ClusterVConnectionCacheEvent(ClusterVConnectionCache * c, int n)
- : Continuation(new_ProxyMutex()), cache(c), hash_index(n)
+ ClusterVConnectionCacheEvent(ClusterVConnectionCache *c, int n) : Continuation(new_ProxyMutex()), cache(c), hash_index(n)
{
SET_HANDLER(&ClusterVConnectionCacheEvent::eventHandler);
}
int eventHandler(int, Event *);
private:
- ClusterVConnectionCache * cache;
+ ClusterVConnectionCache *cache;
int hash_index;
};
@@ -236,12 +223,11 @@ ClusterVConnectionCache::init()
// Setup up periodic purge events on each hash list
eh = new ClusterVConnectionCacheEvent(this, n);
- hash_event[n] =
- eventProcessor.schedule_in(eh, HRTIME_SECONDS(ClusterVConnectionCache::SCAN_INTERVAL), ET_CACHE_CONT_SM);
+ hash_event[n] = eventProcessor.schedule_in(eh, HRTIME_SECONDS(ClusterVConnectionCache::SCAN_INTERVAL), ET_CACHE_CONT_SM);
}
}
inline int
-ClusterVConnectionCache::MD5ToIndex(INK_MD5 * p)
+ClusterVConnectionCache::MD5ToIndex(INK_MD5 *p)
{
uint64_t i = p->fold();
int32_t h, l;
@@ -252,7 +238,7 @@ ClusterVConnectionCache::MD5ToIndex(INK_MD5 * p)
}
int
-ClusterVConnectionCache::insert(INK_MD5 * key, ClusterVConnection * vc)
+ClusterVConnectionCache::insert(INK_MD5 *key, ClusterVConnection *vc)
{
int index = MD5ToIndex(key);
Entry *e;
@@ -262,7 +248,7 @@ ClusterVConnectionCache::insert(INK_MD5 * key, ClusterVConnection * vc)
MUTEX_TRY_LOCK(lock, hash_lock[index], thread);
if (!lock.is_locked()) {
CLUSTER_INCREMENT_DYN_STAT(CLUSTER_VC_CACHE_INSERT_LOCK_MISSES_STAT);
- return 0; // lock miss, retry later
+ return 0; // lock miss, retry later
} else {
// Add entry to list
@@ -273,11 +259,11 @@ ClusterVConnectionCache::insert(INK_MD5 * key, ClusterVConnection * vc)
hash_table[index].enqueue(e);
CLUSTER_INCREMENT_DYN_STAT(CLUSTER_VC_CACHE_INSERTS_STAT);
}
- return 1; // Success
+ return 1; // Success
}
ClusterVConnection *
-ClusterVConnectionCache::lookup(INK_MD5 * key)
+ClusterVConnectionCache::lookup(INK_MD5 *key)
{
int index = MD5ToIndex(key);
Entry *e;
@@ -288,12 +274,12 @@ ClusterVConnectionCache::lookup(INK_MD5 * key)
MUTEX_TRY_LOCK(lock, hash_lock[index], thread);
if (!lock.is_locked()) {
CLUSTER_INCREMENT_DYN_STAT(CLUSTER_VC_CACHE_LOOKUP_LOCK_MISSES_STAT);
- return vc; // lock miss, retry later
+ return vc; // lock miss, retry later
} else {
e = hash_table[index].head;
while (e) {
- if (*key == e->key) { // Hit
+ if (*key == e->key) { // Hit
vc = e->vc;
hash_table[index].remove(e);
ClusterVCCacheEntryAlloc.free(e);
@@ -306,11 +292,11 @@ ClusterVConnectionCache::lookup(INK_MD5 * key)
}
}
CLUSTER_INCREMENT_DYN_STAT(CLUSTER_VC_CACHE_LOOKUP_MISSES_STAT);
- return (ClusterVConnection *) - 1; // Miss
+ return (ClusterVConnection *)-1; // Miss
}
int
-ClusterVConnectionCacheEvent::eventHandler(int /* event ATS_UNUSED */, Event * e)
+ClusterVConnectionCacheEvent::eventHandler(int /* event ATS_UNUSED */, Event *e)
{
CLUSTER_INCREMENT_DYN_STAT(CLUSTER_VC_CACHE_SCANS_STAT);
MUTEX_TRY_LOCK(lock, cache->hash_lock[hash_index], this_ethread());
@@ -321,8 +307,8 @@ ClusterVConnectionCacheEvent::eventHandler(int /* event ATS_UNUSED */, Event * e
}
// Perform purge action on unreferenced VC(s).
- ClusterVConnectionCache::Entry * entry;
- ClusterVConnectionCache::Entry * next_entry;
+ ClusterVConnectionCache::Entry *entry;
+ ClusterVConnectionCache::Entry *next_entry;
entry = cache->hash_table[hash_index].head;
while (entry) {
@@ -372,8 +358,8 @@ CacheContinuation::init()
// Main function to do a cluster cache operation
///////////////////////////////////////////////////////////////////////
Action *
-CacheContinuation::do_op(Continuation * c, ClusterMachine * mp, void *args,
- int user_opcode, char *data, int data_len, int nbytes, MIOBuffer * b)
+CacheContinuation::do_op(Continuation *c, ClusterMachine *mp, void *args, int user_opcode, char *data, int data_len, int nbytes,
+ MIOBuffer *b)
{
CacheContinuation *cc = 0;
Action *act = 0;
@@ -410,8 +396,7 @@ CacheContinuation::do_op(Continuation * c, ClusterMachine * mp, void *args,
cc->start_time = ink_get_hrtime();
cc->from = mp;
cc->result = op_failure(opcode);
- SET_CONTINUATION_HANDLER(cc, (CacheContHandler)
- & CacheContinuation::remoteOpEvent);
+ SET_CONTINUATION_HANDLER(cc, (CacheContHandler)&CacheContinuation::remoteOpEvent);
act = &cc->action;
// set up sequence number so we can find this continuation
@@ -424,7 +409,6 @@ CacheContinuation::do_op(Continuation * c, ClusterMachine * mp, void *args,
unsigned int hash = FOLDHASH(cc->target_ip, cc->seq_number);
MUTEX_TRY_LOCK(queuelock, remoteCacheContQueueMutex[hash], this_ethread());
if (!queuelock.is_locked()) {
-
// failed to acquire lock: no problem, retry later
cc->timeout = eventProcessor.schedule_in(cc, CACHE_RETRY_PERIOD, ET_CACHE_CONT_SM);
} else {
@@ -437,176 +421,169 @@ CacheContinuation::do_op(Continuation * c, ClusterMachine * mp, void *args,
// Determine the type of the "Over The Wire" (OTW) message header and
// initialize it.
//
- Debug("cache_msg",
- "do_op opcode=%d seqno=%d Machine=%p data=%p datalen=%d mio=%p",
- opcode, (c ? cc->seq_number : CACHE_NO_RESPONSE), mp, data, data_len, b);
+ Debug("cache_msg", "do_op opcode=%d seqno=%d Machine=%p data=%p datalen=%d mio=%p", opcode,
+ (c ? cc->seq_number : CACHE_NO_RESPONSE), mp, data, data_len, b);
switch (opcode) {
case CACHE_OPEN_WRITE_BUFFER:
- case CACHE_OPEN_WRITE_BUFFER_LONG:
- {
- ink_release_assert(!"write buffer not supported");
- break;
- }
+ case CACHE_OPEN_WRITE_BUFFER_LONG: {
+ ink_release_assert(!"write buffer not supported");
+ break;
+ }
case CACHE_OPEN_READ_BUFFER:
- case CACHE_OPEN_READ_BUFFER_LONG:
- {
- ink_release_assert(!"read buffer not supported");
- break;
- }
+ case CACHE_OPEN_READ_BUFFER_LONG: {
+ ink_release_assert(!"read buffer not supported");
+ break;
+ }
case CACHE_OPEN_WRITE:
- case CACHE_OPEN_READ:
- {
- ink_release_assert(c > 0);
- //////////////////////
- // Use short format //
- //////////////////////
- if (!data) {
- data_len = op_to_sizeof_fixedlen_msg(opcode);
- data = (char *) ALLOCA_DOUBLE(data_len);
- }
- msg = (char *) data;
- CacheOpMsg_short *m = (CacheOpMsg_short *) msg;
- m->init();
- m->opcode = opcode;
- m->cfl_flags = ((CacheOpArgs_General *) args)->cfl_flags;
- m->md5 = *((CacheOpArgs_General *) args)->url_md5;
- cc->url_md5 = m->md5;
- m->seq_number = (c ? cc->seq_number : CACHE_NO_RESPONSE);
- m->frag_type = ((CacheOpArgs_General *) args)->frag_type;
- if (opcode == CACHE_OPEN_WRITE) {
- m->nbytes = nbytes;
- m->data = (uint32_t) ((CacheOpArgs_General *) args)->pin_in_cache;
- } else {
- m->nbytes = 0;
- m->data = 0;
- }
-
- if (opcode == CACHE_OPEN_READ) {
- //
- // Set upper limit on initial data received with response
- // for open read response
- //
- m->buffer_size = DEFAULT_MAX_BUFFER_SIZE;
- } else {
- m->buffer_size = 0;
- }
+ case CACHE_OPEN_READ: {
+ ink_release_assert(c > 0);
+ //////////////////////
+ // Use short format //
+ //////////////////////
+ if (!data) {
+ data_len = op_to_sizeof_fixedlen_msg(opcode);
+ data = (char *)ALLOCA_DOUBLE(data_len);
+ }
+ msg = (char *)data;
+ CacheOpMsg_short *m = (CacheOpMsg_short *)msg;
+ m->init();
+ m->opcode = opcode;
+ m->cfl_flags = ((CacheOpArgs_General *)args)->cfl_flags;
+ m->md5 = *((CacheOpArgs_General *)args)->url_md5;
+ cc->url_md5 = m->md5;
+ m->seq_number = (c ? cc->seq_number : CACHE_NO_RESPONSE);
+ m->frag_type = ((CacheOpArgs_General *)args)->frag_type;
+ if (opcode == CACHE_OPEN_WRITE) {
+ m->nbytes = nbytes;
+ m->data = (uint32_t)((CacheOpArgs_General *)args)->pin_in_cache;
+ } else {
+ m->nbytes = 0;
+ m->data = 0;
+ }
+ if (opcode == CACHE_OPEN_READ) {
//
- // Establish the local VC
+ // Set upper limit on initial data received with response
+ // for open read response
//
- int res = setup_local_vc(msg, data_len, cc, mp, &act);
- if (!res) {
- /////////////////////////////////////////////////////
- // Unable to setup local VC, request aborted.
- // Remove request from pending list and deallocate.
- /////////////////////////////////////////////////////
- cc->remove_and_delete(0, (Event *) 0);
- return act;
-
- } else if (res != -1) {
- ///////////////////////////////////////
- // VC established, send request
- ///////////////////////////////////////
- break;
+ m->buffer_size = DEFAULT_MAX_BUFFER_SIZE;
+ } else {
+ m->buffer_size = 0;
+ }
- } else {
- //////////////////////////////////////////////////////
- // Unable to setup VC, delay required, await callback
- //////////////////////////////////////////////////////
- goto no_send_exit;
- }
+ //
+ // Establish the local VC
+ //
+ int res = setup_local_vc(msg, data_len, cc, mp, &act);
+ if (!res) {
+ /////////////////////////////////////////////////////
+ // Unable to setup local VC, request aborted.
+ // Remove request from pending list and deallocate.
+ /////////////////////////////////////////////////////
+ cc->remove_and_delete(0, (Event *)0);
+ return act;
+
+ } else if (res != -1) {
+ ///////////////////////////////////////
+ // VC established, send request
+ ///////////////////////////////////////
+ break;
+
+ } else {
+ //////////////////////////////////////////////////////
+ // Unable to setup VC, delay required, await callback
+ //////////////////////////////////////////////////////
+ goto no_send_exit;
}
+ }
case CACHE_OPEN_READ_LONG:
- case CACHE_OPEN_WRITE_LONG:
- {
- ink_release_assert(c > 0);
- //////////////////////
- // Use long format //
- //////////////////////
- msg = data;
- CacheOpMsg_long *m = (CacheOpMsg_long *) msg;
- m->init();
- m->opcode = opcode;
- m->cfl_flags = ((CacheOpArgs_General *) args)->cfl_flags;
- m->url_md5 = *((CacheOpArgs_General *) args)->url_md5;
- cc->url_md5 = m->url_md5;
- m->seq_number = (c ? cc->seq_number : CACHE_NO_RESPONSE);
- m->nbytes = nbytes;
- m->data = (uint32_t) ((CacheOpArgs_General *) args)->pin_in_cache;
- m->frag_type = (uint32_t) ((CacheOpArgs_General *) args)->frag_type;
-
- if (opcode == CACHE_OPEN_READ_LONG) {
- //
- // Set upper limit on initial data received with response
- // for open read response
- //
- m->buffer_size = DEFAULT_MAX_BUFFER_SIZE;
- } else {
- m->buffer_size = 0;
- }
+ case CACHE_OPEN_WRITE_LONG: {
+ ink_release_assert(c > 0);
+ //////////////////////
+ // Use long format //
+ //////////////////////
+ msg = data;
+ CacheOpMsg_long *m = (CacheOpMsg_long *)msg;
+ m->init();
+ m->opcode = opcode;
+ m->cfl_flags = ((CacheOpArgs_General *)args)->cfl_flags;
+ m->url_md5 = *((CacheOpArgs_General *)args)->url_md5;
+ cc->url_md5 = m->url_md5;
+ m->seq_number = (c ? cc->seq_number : CACHE_NO_RESPONSE);
+ m->nbytes = nbytes;
+ m->data = (uint32_t)((CacheOpArgs_General *)args)->pin_in_cache;
+ m->frag_type = (uint32_t)((CacheOpArgs_General *)args)->frag_type;
+
+ if (opcode == CACHE_OPEN_READ_LONG) {
//
- // Establish the local VC
+ // Set upper limit on initial data received with response
+ // for open read response
//
- int res = setup_local_vc(msg, data_len, cc, mp, &act);
- if (!res) {
- /////////////////////////////////////////////////////
- // Unable to setup local VC, request aborted.
- // Remove request from pending list and deallocate.
- /////////////////////////////////////////////////////
- cc->remove_and_delete(0, (Event *) 0);
- return act;
-
- } else if (res != -1) {
- ///////////////////////////////////////
- // VC established, send request
- ///////////////////////////////////////
- break;
+ m->buffer_size = DEFAULT_MAX_BUFFER_SIZE;
+ } else {
+ m->buffer_size = 0;
+ }
+ //
+ // Establish the local VC
+ //
+ int res = setup_local_vc(msg, data_len, cc, mp, &act);
+ if (!res) {
+ /////////////////////////////////////////////////////
+ // Unable to setup local VC, request aborted.
+ // Remove request from pending list and deallocate.
+ /////////////////////////////////////////////////////
+ cc->remove_and_delete(0, (Event *)0);
+ return act;
- } else {
- //////////////////////////////////////////////////////
- // Unable to setup VC, delay required, await callback
- //////////////////////////////////////////////////////
- goto no_send_exit;
- }
+ } else if (res != -1) {
+ ///////////////////////////////////////
+ // VC established, send request
+ ///////////////////////////////////////
+ break;
+
+ } else {
+ //////////////////////////////////////////////////////
+ // Unable to setup VC, delay required, await callback
+ //////////////////////////////////////////////////////
+ goto no_send_exit;
}
+ }
case CACHE_UPDATE:
case CACHE_REMOVE:
- case CACHE_DEREF:
- {
- //////////////////////
- // Use short format //
- //////////////////////
- msg = data;
- CacheOpMsg_short *m = (CacheOpMsg_short *) msg;
- m->init();
- m->opcode = opcode;
- m->frag_type = ((CacheOpArgs_Deref *) args)->frag_type;
- m->cfl_flags = ((CacheOpArgs_Deref *) args)->cfl_flags;
- if (opcode == CACHE_DEREF)
- m->md5 = *((CacheOpArgs_Deref *) args)->md5;
- else
- m->md5 = *((CacheOpArgs_General *) args)->url_md5;
- m->seq_number = (c ? cc->seq_number : CACHE_NO_RESPONSE);
- break;
- }
- case CACHE_LINK:
- {
- ////////////////////////
- // Use short_2 format //
- ////////////////////////
- msg = data;
- CacheOpMsg_short_2 *m = (CacheOpMsg_short_2 *) msg;
- m->init();
- m->opcode = opcode;
- m->cfl_flags = ((CacheOpArgs_Link *) args)->cfl_flags;
- m->md5_1 = *((CacheOpArgs_Link *) args)->from;
- m->md5_2 = *((CacheOpArgs_Link *) args)->to;
- m->seq_number = (c ? cc->seq_number : CACHE_NO_RESPONSE);
- m->frag_type = ((CacheOpArgs_Link *) args)->frag_type;
- break;
- }
+ case CACHE_DEREF: {
+ //////////////////////
+ // Use short format //
+ //////////////////////
+ msg = data;
+ CacheOpMsg_short *m = (CacheOpMsg_short *)msg;
+ m->init();
+ m->opcode = opcode;
+ m->frag_type = ((CacheOpArgs_Deref *)args)->frag_type;
+ m->cfl_flags = ((CacheOpArgs_Deref *)args)->cfl_flags;
+ if (opcode == CACHE_DEREF)
+ m->md5 = *((CacheOpArgs_Deref *)args)->md5;
+ else
+ m->md5 = *((CacheOpArgs_General *)args)->url_md5;
+ m->seq_number = (c ? cc->seq_number : CACHE_NO_RESPONSE);
+ break;
+ }
+ case CACHE_LINK: {
+ ////////////////////////
+ // Use short_2 format //
+ ////////////////////////
+ msg = data;
+ CacheOpMsg_short_2 *m = (CacheOpMsg_short_2 *)msg;
+ m->init();
+ m->opcode = opcode;
+ m->cfl_flags = ((CacheOpArgs_Link *)args)->cfl_flags;
+ m->md5_1 = *((CacheOpArgs_Link *)args)->from;
+ m->md5_2 = *((CacheOpArgs_Link *)args)->to;
+ m->seq_number = (c ? cc->seq_number : CACHE_NO_RESPONSE);
+ m->frag_type = ((CacheOpArgs_Link *)args)->frag_type;
+ break;
+ }
default:
msg = 0;
break;
@@ -614,20 +591,19 @@ CacheContinuation::do_op(Continuation * c, ClusterMachine * mp, void *args,
#ifdef CACHE_MSG_TRACE
log_cache_op_sndmsg((c ? cc->seq_number : CACHE_NO_RESPONSE), 0, "do_op");
#endif
- clusterProcessor.invoke_remote(ch,
- op_needs_marshalled_coi(opcode) ? CACHE_OP_MALLOCED_CLUSTER_FUNCTION
- : CACHE_OP_CLUSTER_FUNCTION, (char *) msg, data_len);
+ clusterProcessor.invoke_remote(
+ ch, op_needs_marshalled_coi(opcode) ? CACHE_OP_MALLOCED_CLUSTER_FUNCTION : CACHE_OP_CLUSTER_FUNCTION, (char *)msg, data_len);
no_send_exit:
if (c) {
return act;
} else {
- return (Action *) 0;
+ return (Action *)0;
}
}
int
-CacheContinuation::setup_local_vc(char *data, int data_len, CacheContinuation * cc, ClusterMachine * mp, Action ** act)
+CacheContinuation::setup_local_vc(char *data, int data_len, CacheContinuation *cc, ClusterMachine *mp, Action **act)
{
bool read_op = op_is_read(cc->request_opcode);
bool short_msg = op_is_shortform(cc->request_opcode);
@@ -637,13 +613,12 @@ CacheContinuation::setup_local_vc(char *data, int data_len, CacheContinuation *
cc->allocMsgBuffer();
memcpy(cc->getMsgBuffer(), data, data_len);
- SET_CONTINUATION_HANDLER(cc, (CacheContHandler)
- & CacheContinuation::localVCsetupEvent);
+ SET_CONTINUATION_HANDLER(cc, (CacheContHandler)&CacheContinuation::localVCsetupEvent);
if (short_msg) {
- Debug("cache_proto", "open_local-s (%s) seqno=%d", (read_op ? "R" : "W"), ((CacheOpMsg_short *) data)->seq_number);
+ Debug("cache_proto", "open_local-s (%s) seqno=%d", (read_op ? "R" : "W"), ((CacheOpMsg_short *)data)->seq_number);
} else {
- Debug("cache_proto", "open_local-l (%s) seqno=%d", (read_op ? "R" : "W"), ((CacheOpMsg_long *) data)->seq_number);
+ Debug("cache_proto", "open_local-l (%s) seqno=%d", (read_op ? "R" : "W"), ((CacheOpMsg_long *)data)->seq_number);
}
// Create local VC
@@ -655,17 +630,14 @@ CacheContinuation::setup_local_vc(char *data, int data_len, CacheContinuation *
} else {
vc = clusterProcessor.open_local(cc, mp, cc->open_local_token,
- (CLUSTER_OPT_ALLOW_IMMEDIATE |
- (read_op ? CLUSTER_OPT_CONN_READ : CLUSTER_OPT_CONN_WRITE)));
+ (CLUSTER_OPT_ALLOW_IMMEDIATE | (read_op ? CLUSTER_OPT_CONN_READ : CLUSTER_OPT_CONN_WRITE)));
}
if (!vc) {
// Error, abort request
if (short_msg) {
- Debug("cache_proto", "0open_local-s (%s) failed, seqno=%d",
- (read_op ? "R" : "W"), ((CacheOpMsg_short *) data)->seq_number);
+ Debug("cache_proto", "0open_local-s (%s) failed, seqno=%d", (read_op ? "R" : "W"), ((CacheOpMsg_short *)data)->seq_number);
} else {
- Debug("cache_proto", "1open_local-l (%s) failed, seqno=%d",
- (read_op ? "R" : "W"), ((CacheOpMsg_long *) data)->seq_number);
+ Debug("cache_proto", "1open_local-l (%s) failed, seqno=%d", (read_op ? "R" : "W"), ((CacheOpMsg_long *)data)->seq_number);
}
cc->freeMsgBuffer();
if (cc->timeout)
@@ -687,23 +659,20 @@ CacheContinuation::setup_local_vc(char *data, int data_len, CacheContinuation *
vc->current_cont = cc;
if (short_msg) {
- CacheOpMsg_short *ms = (CacheOpMsg_short *) data;
+ CacheOpMsg_short *ms = (CacheOpMsg_short *)data;
ms->channel = vc->channel;
ms->token = cc->open_local_token;
- Debug("cache_proto",
- "0open_local-s (%s) success, seqno=%d chan=%d token=%d,%d VC=%p",
- (read_op ? "R" : "W"), ms->seq_number, vc->channel, ms->token.ip_created, ms->token.sequence_number, vc);
+ Debug("cache_proto", "0open_local-s (%s) success, seqno=%d chan=%d token=%d,%d VC=%p", (read_op ? "R" : "W"), ms->seq_number,
+ vc->channel, ms->token.ip_created, ms->token.sequence_number, vc);
} else {
- CacheOpMsg_long *ml = (CacheOpMsg_long *) data;
+ CacheOpMsg_long *ml = (CacheOpMsg_long *)data;
ml->channel = vc->channel;
ml->token = cc->open_local_token;
- Debug("cache_proto",
- "1open_local-l (%s) success, seqno=%d chan=%d token=%d,%d VC=%p",
- (read_op ? "R" : "W"), ml->seq_number, vc->channel, ml->token.ip_created, ml->token.sequence_number, vc);
+ Debug("cache_proto", "1open_local-l (%s) success, seqno=%d chan=%d token=%d,%d VC=%p", (read_op ? "R" : "W"), ml->seq_number,
+ vc->channel, ml->token.ip_created, ml->token.sequence_number, vc);
}
cc->freeMsgBuffer();
- SET_CONTINUATION_HANDLER(cc, (CacheContHandler)
- & CacheContinuation::remoteOpEvent);
+ SET_CONTINUATION_HANDLER(cc, (CacheContHandler)&CacheContinuation::remoteOpEvent);
return 1;
} else {
@@ -723,14 +692,13 @@ CacheContinuation::lookupOpenWriteVC()
// failed.
///////////////////////////////////////////////////////////////
ClusterVConnection *vc;
- CacheOpMsg_long *ml = (CacheOpMsg_long *) getMsgBuffer();
+ CacheOpMsg_long *ml = (CacheOpMsg_long *)getMsgBuffer();
vc = GlobalOpenWriteVCcache->lookup(&ml->url_md5);
- if (vc == ((ClusterVConnection *) 0)) {
+ if (vc == ((ClusterVConnection *)0)) {
// Retry lookup
- SET_CONTINUATION_HANDLER(this, (CacheContHandler)
- & CacheContinuation::lookupOpenWriteVCEvent);
+ SET_CONTINUATION_HANDLER(this, (CacheContHandler)&CacheContinuation::lookupOpenWriteVCEvent);
//
// Note: In the lookupOpenWriteVCEvent handler, we use EVENT_IMMEDIATE
// to distinguish the lookup retry from a request timeout
@@ -738,15 +706,14 @@ CacheContinuation::lookupOpenWriteVC()
//
lookup_open_write_vc_event = eventProcessor.schedule_imm(this, ET_CACHE_CONT_SM);
- } else if (vc != ((ClusterVConnection *) - 1)) {
+ } else if (vc != ((ClusterVConnection *)-1)) {
// Hit, found open_write VC in cache.
// Post open_write completion by simulating a
// remote cache op result message.
- vc->action_ = action; // establish new continuation
+ vc->action_ = action; // establish new continuation
- SET_CONTINUATION_HANDLER(this, (CacheContHandler)
- & CacheContinuation::localVCsetupEvent);
+ SET_CONTINUATION_HANDLER(this, (CacheContHandler)&CacheContinuation::localVCsetupEvent);
this->handleEvent(CLUSTER_EVENT_OPEN_EXISTS, vc);
CacheOpReplyMsg msg;
@@ -757,15 +724,13 @@ CacheContinuation::lookupOpenWriteVC()
msg.seq_number = seq_number;
msg.token = vc->token;
- cache_op_result_ClusterFunction(ch, (void *) &msg, msglen);
+ cache_op_result_ClusterFunction(ch, (void *)&msg, msglen);
} else {
// Miss, establish local VC and send remote open_write request
- SET_CONTINUATION_HANDLER(this, (CacheContHandler)
- & CacheContinuation::localVCsetupEvent);
- vc = clusterProcessor.open_local(this, from, open_local_token,
- (CLUSTER_OPT_ALLOW_IMMEDIATE | CLUSTER_OPT_CONN_WRITE));
+ SET_CONTINUATION_HANDLER(this, (CacheContHandler)&CacheContinuation::localVCsetupEvent);
+ vc = clusterProcessor.open_local(this, from, open_local_token, (CLUSTER_OPT_ALLOW_IMMEDIATE | CLUSTER_OPT_CONN_WRITE));
if (!vc) {
this->handleEvent(CLUSTER_EVENT_OPEN_FAILED, 0);
@@ -773,11 +738,11 @@ CacheContinuation::lookupOpenWriteVC()
this->handleEvent(CLUSTER_EVENT_OPEN, vc);
}
}
- return CLUSTER_DELAYED_OPEN; // force completion in callback
+ return CLUSTER_DELAYED_OPEN; // force completion in callback
}
int
-CacheContinuation::lookupOpenWriteVCEvent(int event, Event * e)
+CacheContinuation::lookupOpenWriteVCEvent(int event, Event *e)
{
if (event == EVENT_IMMEDIATE) {
// Retry open_write VC lookup
@@ -785,15 +750,14 @@ CacheContinuation::lookupOpenWriteVCEvent(int event, Event * e)
} else {
lookup_open_write_vc_event->cancel();
- SET_CONTINUATION_HANDLER(this, (CacheContHandler)
- & CacheContinuation::localVCsetupEvent);
+ SET_CONTINUATION_HANDLER(this, (CacheContHandler)&CacheContinuation::localVCsetupEvent);
this->handleEvent(event, e);
}
return EVENT_DONE;
}
int
-CacheContinuation::remove_and_delete(int /* event ATS_UNUSED */, Event * e)
+CacheContinuation::remove_and_delete(int /* event ATS_UNUSED */, Event *e)
{
unsigned int hash = FOLDHASH(target_ip, seq_number);
MUTEX_TRY_LOCK(queuelock, remoteCacheContQueueMutex[hash], this_ethread());
@@ -808,7 +772,7 @@ CacheContinuation::remove_and_delete(int /* event ATS_UNUSED */, Event * e)
cacheContAllocator_free(this);
} else {
- SET_HANDLER((CacheContHandler) & CacheContinuation::remove_and_delete);
+ SET_HANDLER((CacheContHandler)&CacheContinuation::remove_and_delete);
if (!e) {
timeout = eventProcessor.schedule_in(this, cache_cluster_timeout, ET_CACHE_CONT_SM);
} else {
@@ -819,15 +783,15 @@ CacheContinuation::remove_and_delete(int /* event ATS_UNUSED */, Event * e)
}
int
-CacheContinuation::localVCsetupEvent(int event, ClusterVConnection * vc)
+CacheContinuation::localVCsetupEvent(int event, ClusterVConnection *vc)
{
- ink_assert(magicno == (int) MagicNo);
+ ink_assert(magicno == (int)MagicNo);
ink_assert(getMsgBuffer());
bool short_msg = op_is_shortform(request_opcode);
bool read_op = op_is_read(request_opcode);
if (event == EVENT_INTERVAL) {
- Event *e = (Event *) vc;
+ Event *e = (Event *)vc;
unsigned int hash = FOLDHASH(target_ip, seq_number);
MUTEX_TRY_LOCK(queuelock, remoteCacheContQueueMutex[hash], e->ethread);
@@ -854,7 +818,7 @@ CacheContinuation::localVCsetupEvent(int event, ClusterVConnection * vc)
MUTEX_RELEASE(queuelock);
Debug("cluster_timeout", "0cluster op timeout %d", seq_number);
CLUSTER_INCREMENT_DYN_STAT(CLUSTER_REMOTE_OP_TIMEOUTS_STAT);
- timeout = (Event *) 1; // Note timeout
+ timeout = (Event *)1; // Note timeout
/////////////////////////////////////////////////////////////////
// Note: Failure callback is sent now, but the deallocation of
// the CacheContinuation is deferred until we receive the
@@ -865,8 +829,8 @@ CacheContinuation::localVCsetupEvent(int event, ClusterVConnection * vc)
return EVENT_DONE;
}
- } else if (((event == CLUSTER_EVENT_OPEN) || (event == CLUSTER_EVENT_OPEN_EXISTS))
- && (((ptrdiff_t) timeout & (ptrdiff_t) 1) == 0)) {
+ } else if (((event == CLUSTER_EVENT_OPEN) || (event == CLUSTER_EVENT_OPEN_EXISTS)) &&
+ (((ptrdiff_t)timeout & (ptrdiff_t)1) == 0)) {
ink_hrtime now;
now = ink_get_hrtime();
CLUSTER_SUM_DYN_STAT(CLUSTER_OPEN_DELAY_TIME_STAT, now - start_time);
@@ -880,43 +844,40 @@ CacheContinuation::localVCsetupEvent(int event, ClusterVConnection * vc)
vc->current_cont = this;
if (short_msg) {
- CacheOpMsg_short *ms = (CacheOpMsg_short *) getMsgBuffer();
+ CacheOpMsg_short *ms = (CacheOpMsg_short *)getMsgBuffer();
ms->channel = vc->channel;
ms->token = open_local_token;
- Debug("cache_proto",
- "2open_local-s (%s) success, seqno=%d chan=%d token=%d,%d VC=%p",
- (read_op ? "R" : "W"), ms->seq_number, vc->channel, ms->token.ip_created, ms->token.sequence_number, vc);
+ Debug("cache_proto", "2open_local-s (%s) success, seqno=%d chan=%d token=%d,%d VC=%p", (read_op ? "R" : "W"), ms->seq_number,
+ vc->channel, ms->token.ip_created, ms->token.sequence_number, vc);
} else {
- CacheOpMsg_long *ml = (CacheOpMsg_long *) getMsgBuffer();
+ CacheOpMsg_long *ml = (CacheOpMsg_long *)getMsgBuffer();
ml->channel = vc->channel;
ml->token = open_local_token;
- Debug("cache_proto",
- "3open_local-l (%s) success, seqno=%d chan=%d token=%d,%d VC=%p",
- (read_op ? "R" : "W"), ml->seq_number, vc->channel, ml->token.ip_created, ml->token.sequence_number, vc);
+ Debug("cache_proto", "3open_local-l (%s) success, seqno=%d chan=%d token=%d,%d VC=%p", (read_op ? "R" : "W"), ml->seq_number,
+ vc->channel, ml->token.ip_created, ml->token.sequence_number, vc);
}
- SET_HANDLER((CacheContHandler) & CacheContinuation::remoteOpEvent);
+ SET_HANDLER((CacheContHandler)&CacheContinuation::remoteOpEvent);
if (event != CLUSTER_EVENT_OPEN_EXISTS) {
// Send request message
- clusterProcessor.invoke_remote(ch,
- (op_needs_marshalled_coi(request_opcode) ?
- CACHE_OP_MALLOCED_CLUSTER_FUNCTION :
- CACHE_OP_CLUSTER_FUNCTION), (char *) getMsgBuffer(), getMsgBufferLen());
+ clusterProcessor.invoke_remote(
+ ch, (op_needs_marshalled_coi(request_opcode) ? CACHE_OP_MALLOCED_CLUSTER_FUNCTION : CACHE_OP_CLUSTER_FUNCTION),
+ (char *)getMsgBuffer(), getMsgBufferLen());
}
} else {
int send_failure_callback = 1;
- if (((ptrdiff_t) timeout & (ptrdiff_t) 1) == 0) {
+ if (((ptrdiff_t)timeout & (ptrdiff_t)1) == 0) {
if (short_msg) {
- Debug("cache_proto", "2open_local-s (%s) failed, seqno=%d",
- (read_op ? "R" : "W"), ((CacheOpMsg_short *) getMsgBuffer())->seq_number);
+ Debug("cache_proto", "2open_local-s (%s) failed, seqno=%d", (read_op ? "R" : "W"),
+ ((CacheOpMsg_short *)getMsgBuffer())->seq_number);
} else {
- Debug("cache_proto", "3open_local-l (%s) failed, seqno=%d",
- (read_op ? "R" : "W"), ((CacheOpMsg_long *) getMsgBuffer())->seq_number);
+ Debug("cache_proto", "3open_local-l (%s) failed, seqno=%d", (read_op ? "R" : "W"),
+ ((CacheOpMsg_long *)getMsgBuffer())->seq_number);
}
} else {
@@ -927,10 +888,10 @@ CacheContinuation::localVCsetupEvent(int event, ClusterVConnection * vc)
if (event == CLUSTER_EVENT_OPEN) {
vc->pending_remote_fill = 0;
- vc->remote_closed = 1; // avoid remote close msg
+ vc->remote_closed = 1; // avoid remote close msg
vc->do_io(VIO::CLOSE);
}
- send_failure_callback = 0; // already sent.
+ send_failure_callback = 0; // already sent.
}
if (this->timeout)
@@ -947,7 +908,7 @@ CacheContinuation::localVCsetupEvent(int event, ClusterVConnection * vc)
this->use_deferred_callback = true;
this->result = (read_op ? CACHE_EVENT_OPEN_READ_FAILED : CACHE_EVENT_OPEN_WRITE_FAILED);
this->result_error = 0;
- remove_and_delete(0, (Event *) 0);
+ remove_and_delete(0, (Event *)0);
} else {
cacheContAllocator_free(this);
@@ -973,29 +934,29 @@ inline CacheOpMsg_long *
unmarshal_CacheOpMsg_long(void *data, int NeedByteSwap)
{
if (NeedByteSwap)
- ((CacheOpMsg_long *) data)->SwapBytes();
- return (CacheOpMsg_long *) data;
+ ((CacheOpMsg_long *)data)->SwapBytes();
+ return (CacheOpMsg_long *)data;
}
inline CacheOpMsg_short *
unmarshal_CacheOpMsg_short(void *data, int NeedByteSwap)
{
if (NeedByteSwap)
- ((CacheOpMsg_short *) data)->SwapBytes();
- return (CacheOpMsg_short *) data;
+ ((CacheOpMsg_short *)data)->SwapBytes();
+ return (CacheOpMsg_short *)data;
}
inline CacheOpMsg_short_2 *
unmarshal_CacheOpMsg_short_2(void *data, int NeedByteSwap)
{
if (NeedByteSwap)
- ((CacheOpMsg_short_2 *) data)->SwapBytes();
- return (CacheOpMsg_short_2 *) data;
+ ((CacheOpMsg_short_2 *)data)->SwapBytes();
+ return (CacheOpMsg_short_2 *)data;
}
// init_from_long() support routine for cache_op_ClusterFunction()
inline void
-init_from_long(CacheContinuation * cont, CacheOpMsg_long * msg, ClusterMachine * m)
+init_from_long(CacheContinuation *cont, CacheOpMsg_long *msg, ClusterMachine *m)
{
cont->no_reply_message = (msg->seq_number == CACHE_NO_RESPONSE);
cont->seq_number = msg->seq_number;
@@ -1003,15 +964,14 @@ init_from_long(CacheContinuation * cont, CacheOpMsg_long * msg, ClusterMachine *
cont->from = m;
cont->url_md5 = msg->url_md5;
cont->cluster_vc_channel = msg->channel;
- cont->frag_type = (CacheFragType) msg->frag_type;
- if ((cont->request_opcode == CACHE_OPEN_WRITE_LONG)
- || (cont->request_opcode == CACHE_OPEN_READ_LONG)) {
- cont->pin_in_cache = (time_t) msg->data;
+ cont->frag_type = (CacheFragType)msg->frag_type;
+ if ((cont->request_opcode == CACHE_OPEN_WRITE_LONG) || (cont->request_opcode == CACHE_OPEN_READ_LONG)) {
+ cont->pin_in_cache = (time_t)msg->data;
} else {
cont->pin_in_cache = 0;
}
cont->token = msg->token;
- cont->nbytes = (((int) msg->nbytes < 0) ? 0 : msg->nbytes);
+ cont->nbytes = (((int)msg->nbytes < 0) ? 0 : msg->nbytes);
if (cont->request_opcode == CACHE_OPEN_READ_LONG) {
cont->caller_buf_freebytes = msg->buffer_size;
@@ -1022,7 +982,7 @@ init_from_long(CacheContinuation * cont, CacheOpMsg_long * msg, ClusterMachine *
// init_from_short() support routine for cache_op_ClusterFunction()
inline void
-init_from_short(CacheContinuation * cont, CacheOpMsg_short * msg, ClusterMachine * m)
+init_from_short(CacheContinuation *cont, CacheOpMsg_short *msg, ClusterMachine *m)
{
cont->no_reply_message = (msg->seq_number == CACHE_NO_RESPONSE);
cont->seq_number = msg->seq_number;
@@ -1031,11 +991,11 @@ init_from_short(CacheContinuation * cont, CacheOpMsg_short * msg, ClusterMachine
cont->url_md5 = msg->md5;
cont->cluster_vc_channel = msg->channel;
cont->token = msg->token;
- cont->nbytes = (((int) msg->nbytes < 0) ? 0 : msg->nbytes);
- cont->frag_type = (CacheFragType) msg->frag_type;
+ cont->nbytes = (((int)msg->nbytes < 0) ? 0 : msg->nbytes);
+ cont->frag_type = (CacheFragType)msg->frag_type;
if (cont->request_opcode == CACHE_OPEN_WRITE) {
- cont->pin_in_cache = (time_t) msg->data;
+ cont->pin_in_cache = (time_t)msg->data;
} else {
cont->pin_in_cache = 0;
}
@@ -1049,18 +1009,18 @@ init_from_short(CacheContinuation * cont, CacheOpMsg_short * msg, ClusterMachine
// init_from_short_2() support routine for cache_op_ClusterFunction()
inline void
-init_from_short_2(CacheContinuation * cont, CacheOpMsg_short_2 * msg, ClusterMachine * m)
+init_from_short_2(CacheContinuation *cont, CacheOpMsg_short_2 *msg, ClusterMachine *m)
{
cont->no_reply_message = (msg->seq_number == CACHE_NO_RESPONSE);
cont->seq_number = msg->seq_number;
cont->cfl_flags = msg->cfl_flags;
cont->from = m;
cont->url_md5 = msg->md5_1;
- cont->frag_type = (CacheFragType) msg->frag_type;
+ cont->frag_type = (CacheFragType)msg->frag_type;
}
void
-cache_op_ClusterFunction(ClusterHandler * ch, void *data, int len)
+cache_op_ClusterFunction(ClusterHandler *ch, void *data, int len)
{
EThread *thread = this_ethread();
ProxyMutex *mutex = thread->mutex;
@@ -1070,14 +1030,14 @@ cache_op_ClusterFunction(ClusterHandler * ch, void *data, int len)
CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CACHE_OUTSTANDING_STAT);
int opcode;
- ClusterMessageHeader *mh = (ClusterMessageHeader *) data;
+ ClusterMessageHeader *mh = (ClusterMessageHeader *)data;
- if (mh->GetMsgVersion() != CacheOpMsg_long::CACHE_OP_LONG_MESSAGE_VERSION) { ////////////////////////////////////////////////
+ if (mh->GetMsgVersion() != CacheOpMsg_long::CACHE_OP_LONG_MESSAGE_VERSION) { ////////////////////////////////////////////////
// Convert from old to current message format
////////////////////////////////////////////////
ink_release_assert(!"cache_op_ClusterFunction() bad msg version");
}
- opcode = ((CacheOpMsg_long *) data)->opcode;
+ opcode = ((CacheOpMsg_long *)data)->opcode;
// If necessary, create a continuation to reflect the response back
@@ -1088,8 +1048,7 @@ cache_op_ClusterFunction(ClusterHandler * ch, void *data, int len)
c->token.clear();
c->start_time = ink_get_hrtime();
c->ch = ch;
- SET_CONTINUATION_HANDLER(c, (CacheContHandler)
- & CacheContinuation::replyOpEvent);
+ SET_CONTINUATION_HANDLER(c, (CacheContHandler)&CacheContinuation::replyOpEvent);
switch (opcode) {
case CACHE_OPEN_WRITE_BUFFER:
@@ -1102,362 +1061,320 @@ cache_op_ClusterFunction(ClusterHandler * ch, void *data, int len)
ink_release_assert(!"cache_op_ClusterFunction READ_BUFFER not supported");
break;
- case CACHE_OPEN_READ:
- {
- CacheOpMsg_short *msg = unmarshal_CacheOpMsg_short(data, mh->NeedByteSwap());
- init_from_short(c, msg, ch->machine);
- Debug("cache_msg",
- "cache_op-s op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine);
- //
- // Establish the remote side of the ClusterVConnection
- //
- c->write_cluster_vc = clusterProcessor.connect_local((Continuation *) 0,
- &c->token,
- c->cluster_vc_channel,
- (CLUSTER_OPT_IMMEDIATE | CLUSTER_OPT_CONN_READ));
- if (!c->write_cluster_vc) {
- // Unable to setup channel, abort processing.
- CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CHAN_INUSE_STAT);
- Debug("chan_inuse",
- "1Remote chan=%d inuse tok.ip=%u.%u.%u.%u tok.seqno=%d seqno=%d",
- c->cluster_vc_channel, DOT_SEPARATED(c->token.ip_created), c->token.sequence_number, c->seq_number);
-
- // Send cluster op failed reply
- c->replyOpEvent(CACHE_EVENT_OPEN_READ_FAILED, (VConnection *) - ECLUSTER_CHANNEL_INUSE);
- break;
+ case CACHE_OPEN_READ: {
+ CacheOpMsg_short *msg = unmarshal_CacheOpMsg_short(data, mh->NeedByteSwap());
+ init_from_short(c, msg, ch->machine);
+ Debug("cache_msg", "cache_op-s op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine);
+ //
+ // Establish the remote side of the ClusterVConnection
+ //
+ c->write_cluster_vc = clusterProcessor.connect_local((Continuation *)0, &c->token, c->cluster_vc_channel,
+ (CLUSTER_OPT_IMMEDIATE | CLUSTER_OPT_CONN_READ));
+ if (!c->write_cluster_vc) {
+ // Unable to setup channel, abort processing.
+ CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CHAN_INUSE_STAT);
+ Debug("chan_inuse", "1Remote chan=%d inuse tok.ip=%u.%u.%u.%u tok.seqno=%d seqno=%d", c->cluster_vc_channel,
+ DOT_SEPARATED(c->token.ip_created), c->token.sequence_number, c->seq_number);
+
+ // Send cluster op failed reply
+ c->replyOpEvent(CACHE_EVENT_OPEN_READ_FAILED, (VConnection *)-ECLUSTER_CHANNEL_INUSE);
+ break;
- } else {
- c->write_cluster_vc->current_cont = c;
- }
- ink_release_assert(c->write_cluster_vc != CLUSTER_DELAYED_OPEN);
- ink_release_assert((opcode == CACHE_OPEN_READ)
- || c->write_cluster_vc->pending_remote_fill);
-
- SET_CONTINUATION_HANDLER(c, (CacheContHandler)
- & CacheContinuation::setupVCdataRead);
- Debug("cache_proto",
- "0read op, seqno=%d chan=%d bufsize=%d token=%d,%d",
- msg->seq_number, msg->channel, msg->buffer_size, msg->token.ip_created, msg->token.sequence_number);
+ } else {
+ c->write_cluster_vc->current_cont = c;
+ }
+ ink_release_assert(c->write_cluster_vc != CLUSTER_DELAYED_OPEN);
+ ink_release_assert((opcode == CACHE_OPEN_READ) || c->write_cluster_vc->pending_remote_fill);
+
+ SET_CONTINUATION_HANDLER(c, (CacheContHandler)&CacheContinuation::setupVCdataRead);
+ Debug("cache_proto", "0read op, seqno=%d chan=%d bufsize=%d token=%d,%d", msg->seq_number, msg->channel, msg->buffer_size,
+ msg->token.ip_created, msg->token.sequence_number);
#ifdef CACHE_MSG_TRACE
- log_cache_op_msg(msg->seq_number, len, "cache_op_open_read");
+ log_cache_op_msg(msg->seq_number, len, "cache_op_open_read");
#endif
- CacheKey key(msg->md5);
+ CacheKey key(msg->md5);
- char *hostname = NULL;
- int host_len = len - op_to_sizeof_fixedlen_msg(opcode);
- if (host_len) {
- hostname = (char *) msg->moi.byte;
- }
- Cache *call_cache = caches[c->frag_type];
- c->cache_action = call_cache->open_read(c, &key, c->frag_type, hostname, host_len);
- break;
+ char *hostname = NULL;
+ int host_len = len - op_to_sizeof_fixedlen_msg(opcode);
+ if (host_len) {
+ hostname = (char *)msg->moi.byte;
}
- case CACHE_OPEN_READ_LONG:
- {
- // Cache needs message data, copy it.
- c->setMsgBufferLen(len);
- c->allocMsgBuffer();
- memcpy(c->getMsgBuffer(), (char *) data, len);
-
- int flen = CacheOpMsg_long::sizeof_fixedlen_msg();
- CacheOpMsg_long *msg = unmarshal_CacheOpMsg_long(c->getMsgBuffer(), mh->NeedByteSwap());
- init_from_long(c, msg, ch->machine);
- Debug("cache_msg",
- "cache_op-l op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine);
+ Cache *call_cache = caches[c->frag_type];
+ c->cache_action = call_cache->open_read(c, &key, c->frag_type, hostname, host_len);
+ break;
+ }
+ case CACHE_OPEN_READ_LONG: {
+ // Cache needs message data, copy it.
+ c->setMsgBufferLen(len);
+ c->allocMsgBuffer();
+ memcpy(c->getMsgBuffer(), (char *)data, len);
+
+ int flen = CacheOpMsg_long::sizeof_fixedlen_msg();
+ CacheOpMsg_long *msg = unmarshal_CacheOpMsg_long(c->getMsgBuffer(), mh->NeedByteSwap());
+ init_from_long(c, msg, ch->machine);
+ Debug("cache_msg", "cache_op-l op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine);
#ifdef CACHE_MSG_TRACE
- log_cache_op_msg(msg->seq_number, len, "cache_op_open_read_long");
+ log_cache_op_msg(msg->seq_number, len, "cache_op_open_read_long");
#endif
- //
- // Establish the remote side of the ClusterVConnection
- //
- c->write_cluster_vc = clusterProcessor.connect_local((Continuation *) 0,
- &c->token,
- c->cluster_vc_channel,
- (CLUSTER_OPT_IMMEDIATE | CLUSTER_OPT_CONN_READ));
- if (!c->write_cluster_vc) {
- // Unable to setup channel, abort processing.
- CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CHAN_INUSE_STAT);
- Debug("chan_inuse",
- "2Remote chan=%d inuse tok.ip=%u.%u.%u.%u tok.seqno=%d seqno=%d",
- c->cluster_vc_channel, DOT_SEPARATED(c->token.ip_created), c->token.sequence_number, c->seq_number);
-
- // Send cluster op failed reply
- c->replyOpEvent(CACHE_EVENT_OPEN_READ_FAILED, (VConnection *) - ECLUSTER_CHANNEL_INUSE);
- break;
+ //
+ // Establish the remote side of the ClusterVConnection
+ //
+ c->write_cluster_vc = clusterProcessor.connect_local((Continuation *)0, &c->token, c->cluster_vc_channel,
+ (CLUSTER_OPT_IMMEDIATE | CLUSTER_OPT_CONN_READ));
+ if (!c->write_cluster_vc) {
+ // Unable to setup channel, abort processing.
+ CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CHAN_INUSE_STAT);
+ Debug("chan_inuse", "2Remote chan=%d inuse tok.ip=%u.%u.%u.%u tok.seqno=%d seqno=%d", c->cluster_vc_channel,
+ DOT_SEPARATED(c->token.ip_created), c->token.sequence_number, c->seq_number);
+
+ // Send cluster op failed reply
+ c->replyOpEvent(CACHE_EVENT_OPEN_READ_FAILED, (VConnection *)-ECLUSTER_CHANNEL_INUSE);
+ break;
- } else {
- c->write_cluster_vc->current_cont = c;
- }
- ink_release_assert(c->write_cluster_vc != CLUSTER_DELAYED_OPEN);
- ink_release_assert((opcode == CACHE_OPEN_READ_LONG)
- || c->write_cluster_vc->pending_remote_fill);
-
- SET_CONTINUATION_HANDLER(c, (CacheContHandler)
- & CacheContinuation::setupReadWriteVC);
- Debug("cache_proto",
- "1read op, seqno=%d chan=%d bufsize=%d token=%d,%d",
- msg->seq_number, msg->channel, msg->buffer_size, msg->token.ip_created, msg->token.sequence_number);
-
- const char *p = (const char *) msg + flen;
- int moi_len = len - flen;
- int res;
+ } else {
+ c->write_cluster_vc->current_cont = c;
+ }
+ ink_release_assert(c->write_cluster_vc != CLUSTER_DELAYED_OPEN);
+ ink_release_assert((opcode == CACHE_OPEN_READ_LONG) || c->write_cluster_vc->pending_remote_fill);
- ink_assert(moi_len > 0);
+ SET_CONTINUATION_HANDLER(c, (CacheContHandler)&CacheContinuation::setupReadWriteVC);
+ Debug("cache_proto", "1read op, seqno=%d chan=%d bufsize=%d token=%d,%d", msg->seq_number, msg->channel, msg->buffer_size,
+ msg->token.ip_created, msg->token.sequence_number);
- // Unmarshal CacheHTTPHdr
- res = c->ic_request.unmarshal((char *) p, moi_len, NULL);
- ink_assert(res > 0);
- ink_assert(c->ic_request.valid());
- c->request_purge = c->ic_request.method_get_wksidx() == HTTP_WKSIDX_PURGE || c->ic_request.method_get_wksidx() == HTTP_WKSIDX_DELETE;
- moi_len -= res;
- p += res;
- ink_assert(moi_len > 0);
- // Unmarshal CacheLookupHttpConfig
- c->ic_params = new(CacheLookupHttpConfigAllocator.alloc())
- CacheLookupHttpConfig();
- res = c->ic_params->unmarshal(&c->ic_arena, (const char *) p, moi_len);
- ink_assert(res > 0);
+ const char *p = (const char *)msg + flen;
+ int moi_len = len - flen;
+ int res;
- moi_len -= res;
- p += res;
+ ink_assert(moi_len > 0);
- CacheKey key(msg->url_md5);
+ // Unmarshal CacheHTTPHdr
+ res = c->ic_request.unmarshal((char *)p, moi_len, NULL);
+ ink_assert(res > 0);
+ ink_assert(c->ic_request.valid());
+ c->request_purge =
+ c->ic_request.method_get_wksidx() == HTTP_WKSIDX_PURGE || c->ic_request.method_get_wksidx() == HTTP_WKSIDX_DELETE;
+ moi_len -= res;
+ p += res;
+ ink_assert(moi_len > 0);
+ // Unmarshal CacheLookupHttpConfig
+ c->ic_params = new (CacheLookupHttpConfigAllocator.alloc()) CacheLookupHttpConfig();
+ res = c->ic_params->unmarshal(&c->ic_arena, (const char *)p, moi_len);
+ ink_assert(res > 0);
- char *hostname = NULL;
- int host_len = 0;
+ moi_len -= res;
+ p += res;
- if (moi_len) {
- hostname = (char *) p;
- host_len = moi_len;
+ CacheKey key(msg->url_md5);
- // Save hostname and attach it to the continuation since we may
- // need it if we convert this to an open_write.
+ char *hostname = NULL;
+ int host_len = 0;
- c->ic_hostname = new_IOBufferData(iobuffer_size_to_index(host_len));
- c->ic_hostname_len = host_len;
+ if (moi_len) {
+ hostname = (char *)p;
+ host_len = moi_len;
- memcpy(c->ic_hostname->data(), hostname, host_len);
- }
+ // Save hostname and attach it to the continuation since we may
+ // need it if we convert this to an open_write.
- Cache *call_cache = caches[c->frag_type];
- Action *a = call_cache->open_read(c, &key, &c->ic_request,
- c->ic_params,
- c->frag_type, hostname, host_len);
- // Get rid of purify warnings since 'c' can be freed by open_read.
- if (a != ACTION_RESULT_DONE) {
- c->cache_action = a;
- }
- break;
+ c->ic_hostname = new_IOBufferData(iobuffer_size_to_index(host_len));
+ c->ic_hostname_len = host_len;
+
+ memcpy(c->ic_hostname->data(), hostname, host_len);
}
- case CACHE_OPEN_WRITE:
- {
- CacheOpMsg_short *msg = unmarshal_CacheOpMsg_short(data, mh->NeedByteSwap());
- init_from_short(c, msg, ch->machine);
- Debug("cache_msg",
- "cache_op-s op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine);
+
+ Cache *call_cache = caches[c->frag_type];
+ Action *a = call_cache->open_read(c, &key, &c->ic_request, c->ic_params, c->frag_type, hostname, host_len);
+ // Get rid of purify warnings since 'c' can be freed by open_read.
+ if (a != ACTION_RESULT_DONE) {
+ c->cache_action = a;
+ }
+ break;
+ }
+ case CACHE_OPEN_WRITE: {
+ CacheOpMsg_short *msg = unmarshal_CacheOpMsg_short(data, mh->NeedByteSwap());
+ init_from_short(c, msg, ch->machine);
+ Debug("cache_msg", "cache_op-s op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine);
#ifdef CACHE_MSG_TRACE
- log_cache_op_msg(msg->seq_number, len, "cache_op_open_write");
+ log_cache_op_msg(msg->seq_number, len, "cache_op_open_write");
#endif
- //
- // Establish the remote side of the ClusterVConnection
- //
- c->read_cluster_vc = clusterProcessor.connect_local((Continuation *) 0,
- &c->token,
- c->cluster_vc_channel,
- (CLUSTER_OPT_IMMEDIATE | CLUSTER_OPT_CONN_WRITE));
- if (!c->read_cluster_vc) {
- // Unable to setup channel, abort processing.
- CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CHAN_INUSE_STAT);
- Debug("chan_inuse",
- "3Remote chan=%d inuse tok.ip=%u.%u.%u.%u tok.seqno=%d seqno=%d",
- c->cluster_vc_channel, DOT_SEPARATED(c->token.ip_created), c->token.sequence_number, c->seq_number);
-
- // Send cluster op failed reply
- c->replyOpEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (VConnection *) - ECLUSTER_CHANNEL_INUSE);
- break;
+ //
+ // Establish the remote side of the ClusterVConnection
+ //
+ c->read_cluster_vc = clusterProcessor.connect_local((Continuation *)0, &c->token, c->cluster_vc_channel,
+ (CLUSTER_OPT_IMMEDIATE | CLUSTER_OPT_CONN_WRITE));
+ if (!c->read_cluster_vc) {
+ // Unable to setup channel, abort processing.
+ CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CHAN_INUSE_STAT);
+ Debug("chan_inuse", "3Remote chan=%d inuse tok.ip=%u.%u.%u.%u tok.seqno=%d seqno=%d", c->cluster_vc_channel,
+ DOT_SEPARATED(c->token.ip_created), c->token.sequence_number, c->seq_number);
+
+ // Send cluster op failed reply
+ c->replyOpEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (VConnection *)-ECLUSTER_CHANNEL_INUSE);
+ break;
- } else {
- c->read_cluster_vc->current_cont = c;
- }
- ink_release_assert(c->read_cluster_vc != CLUSTER_DELAYED_OPEN);
+ } else {
+ c->read_cluster_vc->current_cont = c;
+ }
+ ink_release_assert(c->read_cluster_vc != CLUSTER_DELAYED_OPEN);
- CacheKey key(msg->md5);
+ CacheKey key(msg->md5);
- char *hostname = NULL;
- int host_len = len - op_to_sizeof_fixedlen_msg(opcode);
- if (host_len) {
- hostname = (char *) msg->moi.byte;
- }
+ char *hostname = NULL;
+ int host_len = len - op_to_sizeof_fixedlen_msg(opcode);
+ if (host_len) {
+ hostname = (char *)msg->moi.byte;
+ }
- Cache *call_cache = caches[c->frag_type];
- Action *a = call_cache->open_write(c, &key, c->frag_type,
- !!(c->cfl_flags & CFL_OVERWRITE_ON_WRITE),
- c->pin_in_cache, hostname, host_len);
- if (a != ACTION_RESULT_DONE) {
- c->cache_action = a;
- }
- break;
+ Cache *call_cache = caches[c->frag_type];
+ Action *a =
+ call_cache->open_write(c, &key, c->frag_type, !!(c->cfl_flags & CFL_OVERWRITE_ON_WRITE), c->pin_in_cache, hostname, host_len);
+ if (a != ACTION_RESULT_DONE) {
+ c->cache_action = a;
}
- case CACHE_OPEN_WRITE_LONG:
- {
- // Cache needs message data, copy it.
- c->setMsgBufferLen(len);
- c->allocMsgBuffer();
- memcpy(c->getMsgBuffer(), (char *) data, len);
-
- int flen = CacheOpMsg_long::sizeof_fixedlen_msg();
- CacheOpMsg_long *msg = unmarshal_CacheOpMsg_long(c->getMsgBuffer(), mh->NeedByteSwap());
- init_from_long(c, msg, ch->machine);
- Debug("cache_msg",
- "cache_op-l op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine);
+ break;
+ }
+ case CACHE_OPEN_WRITE_LONG: {
+ // Cache needs message data, copy it.
+ c->setMsgBufferLen(len);
+ c->allocMsgBuffer();
+ memcpy(c->getMsgBuffer(), (char *)data, len);
+
+ int flen = CacheOpMsg_long::sizeof_fixedlen_msg();
+ CacheOpMsg_long *msg = unmarshal_CacheOpMsg_long(c->getMsgBuffer(), mh->NeedByteSwap());
+ init_from_long(c, msg, ch->machine);
+ Debug("cache_msg", "cache_op-l op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine);
#ifdef CACHE_MSG_TRACE
- log_cache_op_msg(msg->seq_number, len, "cache_op_open_write_long");
+ log_cache_op_msg(msg->seq_number, len, "cache_op_open_write_long");
#endif
- //
- // Establish the remote side of the ClusterVConnection
- //
- c->read_cluster_vc = clusterProcessor.connect_local((Continuation *) 0,
- &c->token,
- c->cluster_vc_channel,
- (CLUSTER_OPT_IMMEDIATE | CLUSTER_OPT_CONN_WRITE));
- if (!c->read_cluster_vc) {
- // Unable to setup channel, abort processing.
- CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CHAN_INUSE_STAT);
- Debug("chan_inuse",
- "4Remote chan=%d inuse tok.ip=%u.%u.%u.%u tok.seqno=%d seqno=%d",
- c->cluster_vc_channel, DOT_SEPARATED(c->token.ip_created), c->token.sequence_number, c->seq_number);
-
- // Send cluster op failed reply
- c->replyOpEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (VConnection *) - ECLUSTER_CHANNEL_INUSE);
- break;
-
- } else {
- c->read_cluster_vc->current_cont = c;
- }
- ink_release_assert(c->read_cluster_vc != CLUSTER_DELAYED_OPEN);
+ //
+ // Establish the remote side of the ClusterVConnection
+ //
+ c->read_cluster_vc = clusterProcessor.connect_local((Continuation *)0, &c->token, c->cluster_vc_channel,
+ (CLUSTER_OPT_IMMEDIATE | CLUSTER_OPT_CONN_WRITE));
+ if (!c->read_cluster_vc) {
+ // Unable to setup channel, abort processing.
+ CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CHAN_INUSE_STAT);
+ Debug("chan_inuse", "4Remote chan=%d inuse tok.ip=%u.%u.%u.%u tok.seqno=%d seqno=%d", c->cluster_vc_channel,
+ DOT_SEPARATED(c->token.ip_created), c->token.sequence_number, c->seq_number);
+
+ // Send cluster op failed reply
+ c->replyOpEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (VConnection *)-ECLUSTER_CHANNEL_INUSE);
+ break;
- CacheHTTPInfo *ci = 0;
- const char *p = (const char *) msg + flen;
- int res = 0;
- int moi_len = len - flen;
+ } else {
+ c->read_cluster_vc->current_cont = c;
+ }
+ ink_release_assert(c->read_cluster_vc != CLUSTER_DELAYED_OPEN);
- if (moi_len && c->cfl_flags & CFL_LOPENWRITE_HAVE_OLDINFO) {
+ CacheHTTPInfo *ci = 0;
+ const char *p = (const char *)msg + flen;
+ int res = 0;
+ int moi_len = len - flen;
- // Unmarshal old CacheHTTPInfo
- res = HTTPInfo::unmarshal((char *) p, moi_len, NULL);
- ink_assert(res > 0);
- c->ic_old_info.get_handle((char *) p, moi_len);
- ink_assert(c->ic_old_info.valid());
- ci = &c->ic_old_info;
- }
- if (c->cfl_flags & CFL_ALLOW_MULTIPLE_WRITES) {
- ink_assert(!ci);
- ci = (CacheHTTPInfo *) CACHE_ALLOW_MULTIPLE_WRITES;
- }
- moi_len -= res;
- p += res;
+ if (moi_len && c->cfl_flags & CFL_LOPENWRITE_HAVE_OLDINFO) {
+ // Unmarshal old CacheHTTPInfo
+ res = HTTPInfo::unmarshal((char *)p, moi_len, NULL);
+ ink_assert(res > 0);
+ c->ic_old_info.get_handle((char *)p, moi_len);
+ ink_assert(c->ic_old_info.valid());
+ ci = &c->ic_old_info;
+ }
+ if (c->cfl_flags & CFL_ALLOW_MULTIPLE_WRITES) {
+ ink_assert(!ci);
+ ci = (CacheHTTPInfo *)CACHE_ALLOW_MULTIPLE_WRITES;
+ }
+ moi_len -= res;
+ p += res;
- CacheKey key(msg->url_md5);
- char *hostname = NULL;
+ CacheKey key(msg->url_md5);
+ char *hostname = NULL;
- if (moi_len) {
- hostname = (char *) p;
- }
+ if (moi_len) {
+ hostname = (char *)p;
+ }
- Cache *call_cache = caches[c->frag_type];
- Action *a = call_cache->open_write(c, &key, ci, c->pin_in_cache,
- NULL, c->frag_type, hostname, moi_len);
- if (a != ACTION_RESULT_DONE) {
- c->cache_action = a;
- }
- break;
+ Cache *call_cache = caches[c->frag_type];
+ Action *a = call_cache->open_write(c, &key, ci, c->pin_in_cache, NULL, c->frag_type, hostname, moi_len);
+ if (a != ACTION_RESULT_DONE) {
+ c->cache_action = a;
}
- case CACHE_REMOVE:
- {
- CacheOpMsg_short *msg = unmarshal_CacheOpMsg_short(data, mh->NeedByteSwap());
- init_from_short(c, msg, ch->machine);
- Debug("cache_msg",
- "cache_op op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine);
+ break;
+ }
+ case CACHE_REMOVE: {
+ CacheOpMsg_short *msg = unmarshal_CacheOpMsg_short(data, mh->NeedByteSwap());
+ init_from_short(c, msg, ch->machine);
+ Debug("cache_msg", "cache_op op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine);
#ifdef CACHE_MSG_TRACE
- log_cache_op_msg(msg->seq_number, len, "cache_op_remove");
+ log_cache_op_msg(msg->seq_number, len, "cache_op_remove");
#endif
- CacheKey key(msg->md5);
+ CacheKey key(msg->md5);
- char *hostname = NULL;
- int host_len = len - op_to_sizeof_fixedlen_msg(opcode);
- if (host_len) {
- hostname = (char *) msg->moi.byte;
- }
+ char *hostname = NULL;
+ int host_len = len - op_to_sizeof_fixedlen_msg(opcode);
+ if (host_len) {
+ hostname = (char *)msg->moi.byte;
+ }
- Cache *call_cache = caches[c->frag_type];
- Action *a = call_cache->remove(c, &key, c->frag_type,
- !!(c->cfl_flags & CFL_REMOVE_USER_AGENTS),
- !!(c->cfl_flags & CFL_REMOVE_LINK),
- hostname, host_len);
- if (a != ACTION_RESULT_DONE) {
- c->cache_action = a;
- }
- break;
+ Cache *call_cache = caches[c->frag_type];
+ Action *a = call_cache->remove(c, &key, c->frag_type, !!(c->cfl_flags & CFL_REMOVE_USER_AGENTS),
+ !!(c->cfl_flags & CFL_REMOVE_LINK), hostname, host_len);
+ if (a != ACTION_RESULT_DONE) {
+ c->cache_action = a;
}
- case CACHE_LINK:
- {
- CacheOpMsg_short_2 *msg = unmarshal_CacheOpMsg_short_2(data, mh->NeedByteSwap());
- init_from_short_2(c, msg, ch->machine);
- Debug("cache_msg",
- "cache_op op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine);
+ break;
+ }
+ case CACHE_LINK: {
+ CacheOpMsg_short_2 *msg = unmarshal_CacheOpMsg_short_2(data, mh->NeedByteSwap());
+ init_from_short_2(c, msg, ch->machine);
+ Debug("cache_msg", "cache_op op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine);
#ifdef CACHE_MSG_TRACE
- log_cache_op_msg(msg->seq_number, len, "cache_op_link");
+ log_cache_op_msg(msg->seq_number, len, "cache_op_link");
#endif
- CacheKey key1(msg->md5_1);
- CacheKey key2(msg->md5_2);
+ CacheKey key1(msg->md5_1);
+ CacheKey key2(msg->md5_2);
- char *hostname = NULL;
- int host_len = len - op_to_sizeof_fixedlen_msg(opcode);
- if (host_len) {
- hostname = (char *) msg->moi.byte;
- }
+ char *hostname = NULL;
+ int host_len = len - op_to_sizeof_fixedlen_msg(opcode);
+ if (host_len) {
+ hostname = (char *)msg->moi.byte;
+ }
- Cache *call_cache = caches[c->frag_type];
- Action *a = call_cache->link(c, &key1, &key2, c->frag_type,
- hostname, host_len);
- if (a != ACTION_RESULT_DONE) {
- c->cache_action = a;
- }
- break;
+ Cache *call_cache = caches[c->frag_type];
+ Action *a = call_cache->link(c, &key1, &key2, c->frag_type, hostname, host_len);
+ if (a != ACTION_RESULT_DONE) {
+ c->cache_action = a;
}
- case CACHE_DEREF:
- {
- CacheOpMsg_short *msg = unmarshal_CacheOpMsg_short(data, mh->NeedByteSwap());
- init_from_short(c, msg, ch->machine);
- Debug("cache_msg",
- "cache_op op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine);
+ break;
+ }
+ case CACHE_DEREF: {
+ CacheOpMsg_short *msg = unmarshal_CacheOpMsg_short(data, mh->NeedByteSwap());
+ init_from_short(c, msg, ch->machine);
+ Debug("cache_msg", "cache_op op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine);
#ifdef CACHE_MSG_TRACE
- log_cache_op_msg(msg->seq_number, len, "cache_op_deref");
+ log_cache_op_msg(msg->seq_number, len, "cache_op_deref");
#endif
- CacheKey key(msg->md5);
+ CacheKey key(msg->md5);
- char *hostname = NULL;
- int host_len = len - op_to_sizeof_fixedlen_msg(opcode);
- if (host_len) {
- hostname = (char *) msg->moi.byte;
- }
-
- Cache *call_cache = caches[c->frag_type];
- Action *a = call_cache->deref(c, &key, c->frag_type,
- hostname, host_len);
- if (a != ACTION_RESULT_DONE) {
- c->cache_action = a;
- }
- break;
+ char *hostname = NULL;
+ int host_len = len - op_to_sizeof_fixedlen_msg(opcode);
+ if (host_len) {
+ hostname = (char *)msg->moi.byte;
}
- default:
- {
- ink_release_assert(0);
+ Cache *call_cache = caches[c->frag_type];
+ Action *a = call_cache->deref(c, &key, c->frag_type, hostname, host_len);
+ if (a != ACTION_RESULT_DONE) {
+ c->cache_action = a;
}
- } // End of switch
+ break;
+ }
+
+ default: {
+ ink_release_assert(0);
+ }
+ } // End of switch
}
void
@@ -1465,13 +1382,13 @@ cache_op_malloc_ClusterFunction(ClusterHandler *ch, void *data, int len)
{
cache_op_ClusterFunction(ch, data, len);
// We own the message data, free it back to the Cluster subsystem
- clusterProcessor.free_remote_data((char *) data, len);
+ clusterProcessor.free_remote_data((char *)data, len);
}
int
-CacheContinuation::setupVCdataRead(int event, VConnection * vc)
+CacheContinuation::setupVCdataRead(int event, VConnection *vc)
{
- ink_assert(magicno == (int) MagicNo);
+ ink_assert(magicno == (int)MagicNo);
//
// Setup the initial data read for the given Cache VC.
// This data is sent back in the response message.
@@ -1482,27 +1399,27 @@ CacheContinuation::setupVCdataRead(int event, VConnection * vc)
//////////////////////////////////////////
Debug("cache_proto", "setupVCdataRead CACHE_EVENT_OPEN_READ seqno=%d", seq_number);
ink_release_assert(caller_buf_freebytes);
- SET_HANDLER((CacheContHandler) & CacheContinuation::VCdataRead);
+ SET_HANDLER((CacheContHandler)&CacheContinuation::VCdataRead);
int64_t size_index = iobuffer_size_to_index(caller_buf_freebytes);
MIOBuffer *buf = new_MIOBuffer(size_index);
readahead_reader = buf->alloc_reader();
- MUTEX_TRY_LOCK(lock, mutex, this_ethread()); // prevent immediate callback
+ MUTEX_TRY_LOCK(lock, mutex, this_ethread()); // prevent immediate callback
readahead_vio = vc->do_io_read(this, caller_buf_freebytes, buf);
return EVENT_DONE;
} else {
// Error case, deflect processing to replyOpEvent.
- SET_HANDLER((CacheContHandler) & CacheContinuation::replyOpEvent);
+ SET_HANDLER((CacheContHandler)&CacheContinuation::replyOpEvent);
return handleEvent(event, vc);
}
}
int
-CacheContinuation::VCdataRead(int event, VIO * target_vio)
+CacheContinuation::VCdataRead(int event, VIO *target_vio)
{
- ink_release_assert(magicno == (int) MagicNo);
+ ink_release_assert(magicno == (int)MagicNo);
ink_release_assert(readahead_vio == target_vio);
VConnection *vc = target_vio->vc_server;
@@ -1510,129 +1427,121 @@ CacheContinuation::VCdataRead(int event, VIO * target_vio)
int32_t object_size;
switch (event) {
- case VC_EVENT_EOS:
- {
- if (!target_vio->ndone) {
- // Doc with zero byte body, handle as read failure
- goto read_failed;
- }
- // Fall through
+ case VC_EVENT_EOS: {
+ if (!target_vio->ndone) {
+ // Doc with zero byte body, handle as read failure
+ goto read_failed;
}
+ // Fall through
+ }
case VC_EVENT_READ_READY:
- case VC_EVENT_READ_COMPLETE:
- {
- int clone_bytes;
- int current_ndone = target_vio->ndone;
+ case VC_EVENT_READ_COMPLETE: {
+ int clone_bytes;
+ int current_ndone = target_vio->ndone;
- ink_assert(current_ndone);
- ink_assert(current_ndone <= readahead_reader->read_avail());
+ ink_assert(current_ndone);
+ ink_assert(current_ndone <= readahead_reader->read_avail());
- object_size = getObjectSize(vc, request_opcode, &cache_vc_info);
- have_all_data = ((object_size <= caller_buf_freebytes) && (object_size == current_ndone));
+ object_size = getObjectSize(vc, request_opcode, &cache_vc_info);
+ have_all_data = ((object_size <= caller_buf_freebytes) && (object_size == current_ndone));
- // Use no more than the caller's max buffer limit
+ // Use no more than the caller's max buffer limit
- clone_bytes = current_ndone;
- if (!have_all_data) {
- if (current_ndone > caller_buf_freebytes) {
- clone_bytes = caller_buf_freebytes;
- }
+ clone_bytes = current_ndone;
+ if (!have_all_data) {
+ if (current_ndone > caller_buf_freebytes) {
+ clone_bytes = caller_buf_freebytes;
}
- // Clone data
-
- IOBufferBlock *tail;
- readahead_data = clone_IOBufferBlockList(readahead_reader->get_current_block(),
- readahead_reader->start_offset, clone_bytes, &tail);
+ }
+ // Clone data
- if (have_all_data) {
- // Close VC, since no more data and also to avoid VC_EVENT_EOS
+ IOBufferBlock *tail;
+ readahead_data =
+ clone_IOBufferBlockList(readahead_reader->get_current_block(), readahead_reader->start_offset, clone_bytes, &tail);
- MIOBuffer *mbuf = target_vio->buffer.writer();
- vc->do_io(VIO::CLOSE);
- free_MIOBuffer(mbuf);
- readahead_vio = 0;
- }
- SET_HANDLER((CacheContHandler) & CacheContinuation::replyOpEvent);
- handleEvent(reply, vc);
- return EVENT_CONT;
- }
- case VC_EVENT_ERROR:
- case VC_EVENT_INACTIVITY_TIMEOUT:
- case VC_EVENT_ACTIVE_TIMEOUT:
- default:
- {
- read_failed:
- // Read failed, deflect to replyOpEvent.
+ if (have_all_data) {
+ // Close VC, since no more data and also to avoid VC_EVENT_EOS
- MIOBuffer * mbuf = target_vio->buffer.writer();
+ MIOBuffer *mbuf = target_vio->buffer.writer();
vc->do_io(VIO::CLOSE);
free_MIOBuffer(mbuf);
readahead_vio = 0;
- reply = CACHE_EVENT_OPEN_READ_FAILED;
-
- SET_HANDLER((CacheContHandler) & CacheContinuation::replyOpEvent);
- handleEvent(reply, (VConnection *) - ECLUSTER_ORB_DATA_READ);
- return EVENT_DONE;
}
- } // End of switch
+ SET_HANDLER((CacheContHandler)&CacheContinuation::replyOpEvent);
+ handleEvent(reply, vc);
+ return EVENT_CONT;
+ }
+ case VC_EVENT_ERROR:
+ case VC_EVENT_INACTIVITY_TIMEOUT:
+ case VC_EVENT_ACTIVE_TIMEOUT:
+ default: {
+ read_failed:
+ // Read failed, deflect to replyOpEvent.
+
+ MIOBuffer *mbuf = target_vio->buffer.writer();
+ vc->do_io(VIO::CLOSE);
+ free_MIOBuffer(mbuf);
+ readahead_vio = 0;
+ reply = CACHE_EVENT_OPEN_READ_FAILED;
+
+ SET_HANDLER((CacheContHandler)&CacheContinuation::replyOpEvent);
+ handleEvent(reply, (VConnection *)-ECLUSTER_ORB_DATA_READ);
+ return EVENT_DONE;
+ }
+ } // End of switch
}
int
-CacheContinuation::setupReadWriteVC(int event, VConnection * vc)
+CacheContinuation::setupReadWriteVC(int event, VConnection *vc)
{
// Only handles OPEN_READ_LONG processing.
switch (event) {
- case CACHE_EVENT_OPEN_READ:
- {
- // setup readahead
+ case CACHE_EVENT_OPEN_READ: {
+ // setup readahead
- SET_HANDLER((CacheContHandler) & CacheContinuation::setupVCdataRead);
- return handleEvent(event, vc);
- break;
- }
- case CACHE_EVENT_OPEN_READ_FAILED:
- {
- if (frag_type == CACHE_FRAG_TYPE_HTTP && !request_purge) {
- // HTTP open read failed, attempt open write now to avoid an additional
- // message round trip
-
- CacheKey key(url_md5);
-
- Cache *call_cache = caches[frag_type];
- Action *a = call_cache->open_write(this, &key, 0, pin_in_cache,
- NULL, frag_type, ic_hostname ? ic_hostname->data() : NULL,
- ic_hostname_len);
- if (a != ACTION_RESULT_DONE) {
- cache_action = a;
- }
- } else {
- SET_HANDLER((CacheContHandler) & CacheContinuation::replyOpEvent);
- return handleEvent(CACHE_EVENT_OPEN_READ_FAILED, 0);
+ SET_HANDLER((CacheContHandler)&CacheContinuation::setupVCdataRead);
+ return handleEvent(event, vc);
+ break;
+ }
+ case CACHE_EVENT_OPEN_READ_FAILED: {
+ if (frag_type == CACHE_FRAG_TYPE_HTTP && !request_purge) {
+ // HTTP open read failed, attempt open write now to avoid an additional
+ // message round trip
+
+ CacheKey key(url_md5);
+
+ Cache *call_cache = caches[frag_type];
+ Action *a = call_cache->open_write(this, &key, 0, pin_in_cache, NULL, frag_type, ic_hostname ? ic_hostname->data() : NULL,
+ ic_hostname_len);
+ if (a != ACTION_RESULT_DONE) {
+ cache_action = a;
}
- break;
+ } else {
+ SET_HANDLER((CacheContHandler)&CacheContinuation::replyOpEvent);
+ return handleEvent(CACHE_EVENT_OPEN_READ_FAILED, 0);
}
- case CACHE_EVENT_OPEN_WRITE:
- {
- // Convert from read to write connection
+ break;
+ }
+ case CACHE_EVENT_OPEN_WRITE: {
+ // Convert from read to write connection
- ink_assert(!read_cluster_vc && write_cluster_vc);
- read_cluster_vc = write_cluster_vc;
- read_cluster_vc->set_type(CLUSTER_OPT_CONN_WRITE);
- write_cluster_vc = 0;
+ ink_assert(!read_cluster_vc && write_cluster_vc);
+ read_cluster_vc = write_cluster_vc;
+ read_cluster_vc->set_type(CLUSTER_OPT_CONN_WRITE);
+ write_cluster_vc = 0;
- SET_HANDLER((CacheContHandler) & CacheContinuation::replyOpEvent);
- return handleEvent(event, vc);
- break;
- }
+ SET_HANDLER((CacheContHandler)&CacheContinuation::replyOpEvent);
+ return handleEvent(event, vc);
+ break;
+ }
case CACHE_EVENT_OPEN_WRITE_FAILED:
- default:
- {
- SET_HANDLER((CacheContHandler) & CacheContinuation::replyOpEvent);
- return handleEvent(CACHE_EVENT_OPEN_READ_FAILED, 0);
- break;
- }
- } // end of switch
+ default: {
+ SET_HANDLER((CacheContHandler)&CacheContinuation::replyOpEvent);
+ return handleEvent(CACHE_EVENT_OPEN_READ_FAILED, 0);
+ break;
+ }
+ } // end of switch
return EVENT_DONE;
}
@@ -1642,16 +1551,16 @@ CacheContinuation::setupReadWriteVC(int event, VConnection * vc)
// Reflect the (local) reply back to the (remote) requesting node.
/////////////////////////////////////////////////////////////////////////
int
-CacheContinuation::replyOpEvent(int event, VConnection * cvc)
+CacheContinuation::replyOpEvent(int event, VConnection *cvc)
{
- ink_assert(magicno == (int) MagicNo);
+ ink_assert(magicno == (int)MagicNo);
Debug("cache_proto", "replyOpEvent(this=%p,event=%d,VC=%p)", this, event, cvc);
ink_hrtime now;
now = ink_get_hrtime();
CLUSTER_SUM_DYN_STAT(CLUSTER_CACHE_CALLBACK_TIME_STAT, now - start_time);
LOG_EVENT_TIME(start_time, callback_time_dist, cache_callbacks);
ink_release_assert(expect_cache_callback);
- expect_cache_callback = false; // make sure we are called back exactly once
+ expect_cache_callback = false; // make sure we are called back exactly once
result = event;
@@ -1664,8 +1573,7 @@ CacheContinuation::replyOpEvent(int event, VConnection * cvc)
CacheOpReplyMsg *msg = &rmsg;
msg->result = event;
- if ((request_opcode == CACHE_OPEN_READ_LONG)
- && cvc && (event == CACHE_EVENT_OPEN_WRITE)) {
+ if ((request_opcode == CACHE_OPEN_READ_LONG) && cvc && (event == CACHE_EVENT_OPEN_WRITE)) {
//////////////////////////////////////////////////////////////////////////
// open read failed, but open write succeeded, set result to
// CACHE_EVENT_OPEN_READ_FAILED and make result token non zero to
@@ -1676,17 +1584,16 @@ CacheContinuation::replyOpEvent(int event, VConnection * cvc)
}
msg->seq_number = seq_number;
- int flen = CacheOpReplyMsg::sizeof_fixedlen_msg(); // include token
+ int flen = CacheOpReplyMsg::sizeof_fixedlen_msg(); // include token
int len = 0;
int vers = 0;
int results_expected = 1;
- if (no_reply_message) // CACHE_NO_RESPONSE request
+ if (no_reply_message) // CACHE_NO_RESPONSE request
goto free_exit;
if (open) {
-
// prepare for CACHE_OPEN_EVENT
results_expected = 2;
@@ -1695,20 +1602,20 @@ CacheContinuation::replyOpEvent(int event, VConnection * cvc)
if (read_op && !open_read_now_open_write) {
ink_release_assert(write_cluster_vc->pending_remote_fill);
- ink_assert(have_all_data || (readahead_vio == &((CacheVC *) cache_vc)->vio));
+ ink_assert(have_all_data || (readahead_vio == &((CacheVC *)cache_vc)->vio));
Debug("cache_proto", "connect_local success seqno=%d have_all_data=%d", seq_number, (have_all_data ? 1 : 0));
if (have_all_data) {
- msg->token.clear(); // Tell sender no conn established
+ msg->token.clear(); // Tell sender no conn established
write_cluster_vc->type = VC_CLUSTER_WRITE;
} else {
- msg->token = token; // Tell sender conn established
+ msg->token = token; // Tell sender conn established
setupReadBufTunnel(cache_vc, write_cluster_vc);
}
} else {
Debug("cache_proto", "cache_open [%s] success seqno=%d", (cache_read ? "R" : "W"), seq_number);
- msg->token = token; // Tell sender conn established
+ msg->token = token; // Tell sender conn established
OneWayTunnel *pOWT = OneWayTunnel::OneWayTunnel_alloc();
pOWT->init(read_cluster_vc, cache_vc, NULL, nbytes ? nbytes : DEFAULT_MAX_BUFFER_SIZE, this->mutex);
@@ -1723,17 +1630,17 @@ CacheContinuation::replyOpEvent(int event, VConnection * cvc)
msg->is_ram_cache_hit = ((CacheVC *)cache_vc)->is_ram_cache_hit();
if (!cache_vc_info.valid()) {
- (void) getObjectSize(cache_vc, request_opcode, &cache_vc_info);
+ (void)getObjectSize(cache_vc, request_opcode, &cache_vc_info);
}
// Determine data length and allocate
len = cache_vc_info.marshal_length();
- CacheOpReplyMsg *reply = (CacheOpReplyMsg *) ALLOCA_DOUBLE(flen + len);
+ CacheOpReplyMsg *reply = (CacheOpReplyMsg *)ALLOCA_DOUBLE(flen + len);
// Initialize reply message header
*reply = *msg;
// Marshal response data into reply message
- res = cache_vc_info.marshal((char *) reply + flen, len);
+ res = cache_vc_info.marshal((char *)reply + flen, len);
ink_assert(res >= 0 && res <= len);
// Make reply message the current message
@@ -1742,11 +1649,11 @@ CacheContinuation::replyOpEvent(int event, VConnection * cvc)
} else {
Debug("cache_proto", "cache operation failed result=%d seqno=%d (this=%p)", event, seq_number, this);
- msg->token.clear(); // Tell sender no conn established
+ msg->token.clear(); // Tell sender no conn established
// Reallocate reply message, allowing for marshalled data
len += sizeof(int32_t);
- CacheOpReplyMsg *reply = (CacheOpReplyMsg *) ALLOCA_DOUBLE(flen + len);
+ CacheOpReplyMsg *reply = (CacheOpReplyMsg *)ALLOCA_DOUBLE(flen + len);
// Initialize reply message header
*reply = *msg;
@@ -1756,24 +1663,24 @@ CacheContinuation::replyOpEvent(int event, VConnection * cvc)
// open read/write failed, close preallocated VC
//
if (read_cluster_vc) {
- read_cluster_vc->remote_closed = 1; // avoid remote close msg
+ read_cluster_vc->remote_closed = 1; // avoid remote close msg
read_cluster_vc->do_io(VIO::CLOSE);
}
if (write_cluster_vc) {
write_cluster_vc->pending_remote_fill = 0;
- write_cluster_vc->remote_closed = 1; // avoid remote close msg
+ write_cluster_vc->remote_closed = 1; // avoid remote close msg
write_cluster_vc->do_io(VIO::CLOSE);
}
- reply->moi.u32 = (int32_t) ((uintptr_t) cvc & 0xffffffff); // code describing failure
+ reply->moi.u32 = (int32_t)((uintptr_t)cvc & 0xffffffff); // code describing failure
}
// Make reply message the current message
msg = reply;
}
CLUSTER_DECREMENT_DYN_STAT(CLUSTER_CACHE_OUTSTANDING_STAT);
- //
- // Send reply message
- //
+//
+// Send reply message
+//
#ifdef CACHE_MSG_TRACE
log_cache_op_sndmsg(msg->seq_number, 0, "replyOpEvent");
#endif
@@ -1781,18 +1688,14 @@ CacheContinuation::replyOpEvent(int event, VConnection * cvc)
if (vers == CacheOpReplyMsg::CACHE_OP_REPLY_MESSAGE_VERSION) {
if (read_op) {
// Transmit reply message and object data in same cluster message
- Debug("cache_proto", "Sending reply/data seqno=%d buflen=%" PRId64,
- seq_number, readahead_data ? bytes_IOBufferBlockList(readahead_data, 1) : 0);
- clusterProcessor.invoke_remote_data(ch,
- CACHE_OP_RESULT_CLUSTER_FUNCTION,
- (void *) msg, (flen + len),
- readahead_data,
- cluster_vc_channel, &token,
- &CacheContinuation::disposeOfDataBuffer, (void *) this, CLUSTER_OPT_STEAL);
+ Debug("cache_proto", "Sending reply/data seqno=%d buflen=%" PRId64, seq_number,
+ readahead_data ? bytes_IOBufferBlockList(readahead_data, 1) : 0);
+ clusterProcessor.invoke_remote_data(ch, CACHE_OP_RESULT_CLUSTER_FUNCTION, (void *)msg, (flen + len), readahead_data,
+ cluster_vc_channel, &token, &CacheContinuation::disposeOfDataBuffer, (void *)this,
+ CLUSTER_OPT_STEAL);
} else {
Debug("cache_proto", "Sending reply seqno=%d, (this=%p)", seq_number, this);
- clusterProcessor.invoke_remote(ch, CACHE_OP_RESULT_CLUSTER_FUNCTION,
- (void *) msg, (flen + len), CLUSTER_OPT_STEAL);
+ clusterProcessor.invoke_remote(ch, CACHE_OP_RESULT_CLUSTER_FUNCTION, (void *)msg, (flen + len), CLUSTER_OPT_STEAL);
}
} else {
@@ -1812,7 +1715,7 @@ free_exit:
}
void
-CacheContinuation::setupReadBufTunnel(VConnection * cache_read_vc, VConnection * cluster_write_vc)
+CacheContinuation::setupReadBufTunnel(VConnection *cache_read_vc, VConnection *cluster_write_vc)
{
////////////////////////////////////////////////////////////
// Setup OneWayTunnel and tunnel close event handler.
@@ -1820,22 +1723,21 @@ CacheContinuation::setupReadBufTunnel(VConnection * cache_read_vc, VConnection *
////////////////////////////////////////////////////////////
tunnel_cont = cacheContAllocator_alloc();
tunnel_cont->mutex = this->mutex;
- SET_CONTINUATION_HANDLER(tunnel_cont, (CacheContHandler)
- & CacheContinuation::tunnelClosedEvent);
+ SET_CONTINUATION_HANDLER(tunnel_cont, (CacheContHandler)&CacheContinuation::tunnelClosedEvent);
int64_t ravail = bytes_IOBufferBlockList(readahead_data, 1);
tunnel_mutex = tunnel_cont->mutex;
tunnel_closed = false;
tunnel = OneWayTunnel::OneWayTunnel_alloc();
- readahead_reader->consume(ravail); // allow for bytes sent in initial reply
+ readahead_reader->consume(ravail); // allow for bytes sent in initial reply
tunnel->init(cache_read_vc, cluster_write_vc, tunnel_cont, readahead_vio, readahead_reader);
tunnel_cont->action = this;
tunnel_cont->tunnel = tunnel;
tunnel_cont->tunnel_cont = tunnel_cont;
// Disable cluster_write_vc
- ((ClusterVConnection *) cluster_write_vc)->write.enabled = 0;
+ ((ClusterVConnection *)cluster_write_vc)->write.enabled = 0;
// Disable cache read VC
readahead_vio->nbytes = readahead_vio->ndone;
@@ -1853,11 +1755,11 @@ CacheContinuation::setupReadBufTunnel(VConnection * cache_read_vc, VConnection *
int
CacheContinuation::tunnelClosedEvent(int /* event ATS_UNUSED */, void *c)
{
- ink_assert(magicno == (int) MagicNo);
+ ink_assert(magicno == (int)MagicNo);
// Note: We are called with the tunnel_mutex held.
- CacheContinuation *tc = (CacheContinuation *) c;
+ CacheContinuation *tc = (CacheContinuation *)c;
ink_release_assert(tc->tunnel_cont == tc);
- CacheContinuation *real_cc = (CacheContinuation *) tc->action.continuation;
+ CacheContinuation *real_cc = (CacheContinuation *)tc->action.continuation;
if (real_cc) {
// Notify the real continuation of the tunnel closed event
@@ -1875,26 +1777,24 @@ CacheContinuation::tunnelClosedEvent(int /* event ATS_UNUSED */, void *c)
// Retry DisposeOfDataBuffer continuation
////////////////////////////////////////////////////////////
struct retryDisposeOfDataBuffer;
-typedef int (retryDisposeOfDataBuffer::*rtryDisOfDBufHandler) (int, void *);
-struct retryDisposeOfDataBuffer:public Continuation
-{
+typedef int (retryDisposeOfDataBuffer::*rtryDisOfDBufHandler)(int, void *);
+struct retryDisposeOfDataBuffer : public Continuation {
CacheContinuation *c;
- int handleRetryEvent(int event, Event * e)
+ int
+ handleRetryEvent(int event, Event *e)
{
if (CacheContinuation::handleDisposeEvent(event, c) == EVENT_DONE) {
delete this;
- return EVENT_DONE;
- } else
- {
+ return EVENT_DONE;
+ } else {
e->schedule_in(HRTIME_MSECONDS(10));
return EVENT_CONT;
}
}
- retryDisposeOfDataBuffer(CacheContinuation * cont)
-: Continuation(new_ProxyMutex()), c(cont) {
- SET_HANDLER((rtryDisOfDBufHandler)
- & retryDisposeOfDataBuffer::handleRetryEvent);
+ retryDisposeOfDataBuffer(CacheContinuation *cont) : Continuation(new_ProxyMutex()), c(cont)
+ {
+ SET_HANDLER((rtryDisOfDBufHandler)&retryDisposeOfDataBuffer::handleRetryEvent);
}
};
@@ -1906,9 +1806,9 @@ void
CacheContinuation::disposeOfDataBuffer(void *d)
{
ink_assert(d);
- CacheContinuation *cc = (CacheContinuation *) d;
+ CacheContinuation *cc = (CacheContinuation *)d;
ink_assert(cc->have_all_data || cc->readahead_vio);
- ink_assert(cc->have_all_data || (cc->readahead_vio == &((CacheVC *) cc->cache_vc)->vio));
+ ink_assert(cc->have_all_data || (cc->readahead_vio == &((CacheVC *)cc->cache_vc)->vio));
if (cc->have_all_data) {
//
@@ -1936,9 +1836,9 @@ CacheContinuation::disposeOfDataBuffer(void *d)
}
int
-CacheContinuation::handleDisposeEvent(int /* event ATS_UNUSED */, CacheContinuation * cc)
+CacheContinuation::handleDisposeEvent(int /* event ATS_UNUSED */, CacheContinuation *cc)
{
- ink_assert(cc->magicno == (int) MagicNo);
+ ink_assert(cc->magicno == (int)MagicNo);
MUTEX_TRY_LOCK(lock, cc->tunnel_mutex, this_ethread());
if (lock.is_locked()) {
// Write of initial object data is complete.
@@ -1981,13 +1881,13 @@ cache_op_result_ClusterFunct
<TRUNCATED>