You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by ge...@apache.org on 2010/02/24 19:48:52 UTC
svn commit: r915922 [4/14] - in /incubator/trafficserver/traffic/trunk: ./
install/ iocore/aio/ iocore/block-cache/ iocore/cache/ iocore/cluster/
iocore/dns/ iocore/eventsystem/ iocore/hostdb/ iocore/net/ iocore/utils/
libev/ libev/CVS/ libinktomi++/ l...
Modified: incubator/trafficserver/traffic/trunk/iocore/cache/CacheRead.cc
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/trunk/iocore/cache/CacheRead.cc?rev=915922&r1=915921&r2=915922&view=diff
==============================================================================
--- incubator/trafficserver/traffic/trunk/iocore/cache/CacheRead.cc (original)
+++ incubator/trafficserver/traffic/trunk/iocore/cache/CacheRead.cc Wed Feb 24 18:48:42 2010
@@ -29,60 +29,59 @@
Action *
Cache::open_read(Continuation * cont, CacheKey * key, CacheFragType type, char *hostname, int host_len)
{
-
if (!(CacheProcessor::cache_ready & type)) {
cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *) -ECACHE_NOT_READY);
return ACTION_RESULT_DONE;
}
-
ink_assert(caches[type] == this);
Part *part = key_to_part(key, hostname, host_len);
Dir result, *last_collision = NULL;
ProxyMutex *mutex = cont->mutex;
-
- CacheVC *c = new_CacheVC(cont);
- SET_CONTINUATION_HANDLER(c, &CacheVC::openReadStartHead);
- c->vio.op = VIO::READ;
- c->base_stat = cache_read_active_stat;
- CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
- c->first_key = c->key = c->earliest_key = *key;
- c->part = part;
- if (type == CACHE_FRAG_TYPE_HTTP) {
- c->f.http_request = 1;
- }
-
- CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
- if (lock) {
- c->od = part->open_read(key);
- if (c->od) {
- // someone is currently writing the document
- SET_CONTINUATION_HANDLER(c, &CacheVC::openReadFromWriter);
- if (c->handleEvent(EVENT_IMMEDIATE, 0) == EVENT_CONT)
- return &c->_action;
- else
- return ACTION_RESULT_DONE;
+ OpenDirEntry *od = NULL;
+ CacheVC *c = NULL;
+ {
+ CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
+ if (!lock || (od = part->open_read(key)) || dir_probe(key, part, &result, &last_collision)) {
+ c = new_CacheVC(cont);
+ SET_CONTINUATION_HANDLER(c, &CacheVC::openReadStartHead);
+ c->vio.op = VIO::READ;
+ c->base_stat = cache_read_active_stat;
+ CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
+ c->first_key = c->key = c->earliest_key = *key;
+ c->part = part;
+ c->frag_type = type;
+ c->od = od;
}
- // no writer
- if (!dir_probe(key, part, &result, &last_collision)) {
- //release the lock before calling handleEvent??
- CACHE_INCREMENT_DYN_STAT(cache_read_failure_stat);
- cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *) -ECACHE_NO_DOC);
- free_CacheVC(c);
- return ACTION_RESULT_DONE;
+ if (!lock) {
+ CONT_SCHED_LOCK_RETRY(c);
+ return &c->_action;
}
-
+ if (!c)
+ goto Lmiss;
+ if (c->od)
+ goto Lwriter;
c->dir = result;
c->last_collision = last_collision;
+ switch(c->do_read_call(&c->key)) {
+ case EVENT_DONE: return ACTION_RESULT_DONE;
+ case EVENT_RETURN: goto Lcallreturn;
+ default: return &c->_action;
+ }
}
- if (!lock) {
- CONT_SCHED_LOCK_RETRY(c);
- return &c->_action;
- }
- if (c->do_read(&c->key) == EVENT_CONT)
- return &c->_action;
- else
- return ACTION_RESULT_DONE; // ram cache hit
+Lmiss:
+ CACHE_INCREMENT_DYN_STAT(cache_read_failure_stat);
+ cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *) -ECACHE_NO_DOC);
+ return ACTION_RESULT_DONE;
+Lwriter:
+ SET_CONTINUATION_HANDLER(c, &CacheVC::openReadFromWriter);
+ if (c->handleEvent(EVENT_IMMEDIATE, 0) == EVENT_DONE)
+ return ACTION_RESULT_DONE;
+ return &c->_action;
+Lcallreturn:
+ if (c->handleEvent(AIO_EVENT_DONE, 0) == EVENT_DONE)
+ return ACTION_RESULT_DONE;
+ return &c->_action;
}
#ifdef HTTP_CACHE
@@ -95,59 +94,62 @@
cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *) -ECACHE_NOT_READY);
return ACTION_RESULT_DONE;
}
-
ink_assert(caches[type] == this);
Part *part = key_to_part(key, hostname, host_len);
Dir result, *last_collision = NULL;
ProxyMutex *mutex = cont->mutex;
+ OpenDirEntry *od = NULL;
+ CacheVC *c = NULL;
- CacheVC *c = new_CacheVC(cont);
- c->first_key = c->key = c->earliest_key = *key;
- c->part = part;
- c->vio.op = VIO::READ;
- c->base_stat = cache_read_active_stat;
- CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
- c->request.copy_shallow(request);
- c->f.http_request = 1;
- c->params = params;
-
- CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
- if (lock) {
- c->od = part->open_read(key);
- if (c->od) {
- //Type-casting the continuation to HttpCacheSM object
- HttpCacheSM *cachesm = (HttpCacheSM *) cont;
- //Setting the read_while_write_inprogress flag
- cachesm->set_readwhilewrite_inprogress(true);
-
- // someone is currently writing the document
- SET_CONTINUATION_HANDLER(c, &CacheVC::openReadFromWriter);
- if (c->handleEvent(EVENT_IMMEDIATE, 0) == EVENT_CONT)
- return &c->_action;
- else
- return ACTION_RESULT_DONE;
- }
- if (!dir_probe(key, part, &result, &last_collision)) {
- //release the lock before calling handleEvent???
- CACHE_INCREMENT_DYN_STAT(cache_read_failure_stat);
- cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *) -ECACHE_NO_DOC);
- free_CacheVC(c);
- return ACTION_RESULT_DONE;
+ {
+ CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
+ if (!lock || (od = part->open_read(key)) || dir_probe(key, part, &result, &last_collision)) {
+ c = new_CacheVC(cont);
+ c->first_key = c->key = c->earliest_key = *key;
+ c->part = part;
+ c->vio.op = VIO::READ;
+ c->base_stat = cache_read_active_stat;
+ CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
+ c->request.copy_shallow(request);
+ c->frag_type = CACHE_FRAG_TYPE_HTTP;
+ c->params = params;
+ c->od = od;
}
+ if (!lock) {
+ SET_CONTINUATION_HANDLER(c, &CacheVC::openReadStartHead);
+ CONT_SCHED_LOCK_RETRY(c);
+ return &c->_action;
+ }
+ if (!c)
+ goto Lmiss;
+ if (c->od)
+ goto Lwriter;
+ // hit
c->dir = c->first_dir = result;
c->last_collision = last_collision;
-
- }
- SET_CONTINUATION_HANDLER(c, &CacheVC::openReadStartHead);
- if (!lock) {
- CONT_SCHED_LOCK_RETRY(c);
- return &c->_action;
+ SET_CONTINUATION_HANDLER(c, &CacheVC::openReadStartHead);
+ switch(c->do_read_call(&c->key)) {
+ case EVENT_DONE: return ACTION_RESULT_DONE;
+ case EVENT_RETURN: goto Lcallreturn;
+ default: return &c->_action;
+ }
}
- if (c->do_read(&c->key) == EVENT_CONT)
- return &c->_action;
- else
- return ACTION_RESULT_DONE; // ram cache hit
+Lmiss:
+ CACHE_INCREMENT_DYN_STAT(cache_read_failure_stat);
+ cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *) -ECACHE_NO_DOC);
+ return ACTION_RESULT_DONE;
+Lwriter:
+ // this is a horrible violation of the interface and should be fixed (FIXME)
+ ((HttpCacheSM *)cont)->set_readwhilewrite_inprogress(true);
+ SET_CONTINUATION_HANDLER(c, &CacheVC::openReadFromWriter);
+ if (c->handleEvent(EVENT_IMMEDIATE, 0) == EVENT_DONE)
+ return ACTION_RESULT_DONE;
+ return &c->_action;
+Lcallreturn:
+ if (c->handleEvent(AIO_EVENT_DONE, 0) == EVENT_DONE)
+ return ACTION_RESULT_DONE;
+ return &c->_action;
}
#endif
@@ -173,15 +175,21 @@
intptr_t err = ECACHE_DOC_BUSY;
CacheVC *w = NULL;
- if (!f.http_request) {
+ CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
+ if (!lock)
+ VC_SCHED_LOCK_RETRY();
+
+ if (frag_type != CACHE_FRAG_TYPE_HTTP) {
ink_assert(od->num_writers == 1);
w = od->writers.head;
if (w->start_time > start_time || w->closed < 0) {
+ MUTEX_RELEASE(lock);
od = NULL;
SET_HANDLER(&CacheVC::openReadStartHead);
return openReadStartHead(EVENT_IMMEDIATE, 0);
}
if (!w->closed) {
+ MUTEX_RELEASE(lock);
return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, (Event *) -err);
}
write_vc = w;
@@ -198,6 +206,7 @@
if (w->start_time > start_time || w->closed < 0)
continue;
if (!w->closed && !cache_config_read_while_writer) {
+ MUTEX_RELEASE(lock);
return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, (Event *) - err);
}
if (w->alternate_index != CACHE_ALT_INDEX_DEFAULT)
@@ -227,13 +236,15 @@
if (!vector.count()) {
if (od->reading_vec) {
- // the writer(s) are reading the vector, so there is probably
+ MUTEX_RELEASE(lock);
+ // the writer(s) are reading the vector, so there is probably
// an old vector. Since this reader came before any of the
// current writers, we should return the old data
od = NULL;
SET_HANDLER(&CacheVC::openReadStartHead);
return openReadStartHead(EVENT_IMMEDIATE, 0);
} else {
+ MUTEX_RELEASE(lock);
return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, (Event *) - ECACHE_NO_DOC);
}
}
@@ -241,6 +252,7 @@
if (cache_config_select_alternate) {
alternate_index = HttpTransactCache::SelectFromAlternates(&vector, &request, params);
if (alternate_index < 0)
+ MUTEX_RELEASE(lock);
return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, (Event *) - ECACHE_ALT_MISS);
} else
#endif
@@ -254,13 +266,14 @@
}
vector.clear(false);
if (!write_vc) {
- Debug("cache_read_agg", "%x: key: %X writer alternate different: %d", this, first_key.word(1), alternate_index);
+ MUTEX_RELEASE(lock);
+ DDebug("cache_read_agg", "%x: key: %X %X writer alternate different: %d", this, first_key.word(1), alternate_index);
od = NULL;
SET_HANDLER(&CacheVC::openReadStartHead);
return openReadStartHead(EVENT_IMMEDIATE, 0);
}
- Debug("cache_read_agg",
+ DDebug("cache_read_agg",
"%x: key: %X eKey: %d # alts: %d, ndx: %d, # writers: %d writer: %x",
this, first_key.word(1), write_vc->earliest_key.word(1),
vector.count(), alternate_index, od->num_writers, write_vc);
@@ -286,34 +299,32 @@
}
cancel_trigger();
intptr_t err = ECACHE_DOC_BUSY;
- Debug("cache_read_agg", "%x: key: %X In openReadFromWriter", this, first_key.word(1));
+ DDebug("cache_read_agg", "%x: key: %X In openReadFromWriter", this, first_key.word(1));
#ifndef READ_WHILE_WRITER
return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, (Event *) -err);
#else
if (_action.cancelled)
return free_CacheVC(this);
CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
- if (!lock) {
- ink_assert(!od);
+ if (!lock)
VC_SCHED_LOCK_RETRY();
- }
+ od = part->open_read(&first_key); // recheck in case the lock failed
if (!od) {
- od = part->open_read(&first_key);
- if (!od) {
- write_vc = NULL;
- SET_HANDLER(&CacheVC::openReadStartHead);
- return openReadStartHead(event, e);
- }
+ MUTEX_RELEASE(lock);
+ write_vc = NULL;
+ SET_HANDLER(&CacheVC::openReadStartHead);
+ return openReadStartHead(event, e);
} else
ink_debug_assert(od == part->open_read(&first_key));
-
if (!write_vc) {
+ MUTEX_RELEASE(lock);
int ret = openReadChooseWriter(event, e);
if (ret == EVENT_DONE || !write_vc)
return ret;
} else {
if (writer_done()) {
- Debug("cache_read_agg",
+ MUTEX_RELEASE(lock);
+ DDebug("cache_read_agg",
"%x: key: %X writer %x has left, continuing as normal read", this, first_key.word(1), write_vc);
od = NULL;
write_vc = NULL;
@@ -325,35 +336,37 @@
od = NULL;
// someone is currently writing the document
if (write_vc->closed < 0) {
+ MUTEX_RELEASE(lock);
write_vc = NULL;
//writer aborted, continue as if there is no writer
SET_HANDLER(&CacheVC::openReadStartHead);
return openReadStartHead(EVENT_IMMEDIATE, 0);
}
// allow reading from unclosed writer for http requests only.
- ink_assert(f.http_request || write_vc->closed);
- if (!write_vc->closed && !write_vc->segment) {
- if (!cache_config_read_while_writer || !f.http_request)
+ ink_assert(frag_type == CACHE_FRAG_TYPE_HTTP || write_vc->closed);
+ if (!write_vc->closed && !write_vc->fragment) {
+ if (!cache_config_read_while_writer || frag_type != CACHE_FRAG_TYPE_HTTP)
return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, (Event *) - err);
- Debug("cache_read_agg",
- "%x: key: %X writer: closed:%d, segment:%d, retry: %d",
- this, first_key.word(1), write_vc->closed, write_vc->segment, writer_lock_retry);
+ DDebug("cache_read_agg",
+ "%x: key: %X writer: closed:%d, fragment:%d, retry: %d",
+ this, first_key.word(1), write_vc->closed, write_vc->fragment, writer_lock_retry);
VC_SCHED_WRITER_RETRY();
}
CACHE_TRY_LOCK(writer_lock, write_vc->mutex, mutex->thread_holding);
if (!writer_lock) {
- Debug("cache_read_agg", "%x: key: %X lock miss", this, first_key.word(1));
+ DDebug("cache_read_agg", "%x: key: %X lock miss", this, first_key.word(1));
VC_SCHED_LOCK_RETRY();
}
+ MUTEX_RELEASE(lock);
if (!write_vc->io.ok())
return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, (Event *) - err);
#ifdef HTTP_CACHE
- if (f.http_request) {
- Debug("cache_read_agg",
- "%x: key: %X http passed stage 1, closed: %d, seg: %d",
- this, first_key.word(1), write_vc->closed, write_vc->segment);
+ if (frag_type == CACHE_FRAG_TYPE_HTTP) {
+ DDebug("cache_read_agg",
+ "%x: key: %X http passed stage 1, closed: %d, frag: %d",
+ this, first_key.word(1), write_vc->closed, write_vc->fragment);
if (!write_vc->alternate.valid())
return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, (Event *) - err);
alternate.copy(&write_vc->alternate);
@@ -369,7 +382,7 @@
} else {
key = write_vc->update_key;
ink_assert(write_vc->closed);
- Debug("cache_read_agg", "%x: key: %X writer header update", this, first_key.word(1));
+ DDebug("cache_read_agg", "%x: key: %X writer header update", this, first_key.word(1));
// Update case (b) : grab doc_len from the writer's alternate
doc_len = alternate.object_size_get();
if (write_vc->update_key == cod->single_doc_key &&
@@ -378,19 +391,18 @@
// header only update. The first_buf of the writer has the
// document body.
Doc *doc = (Doc *) write_vc->first_buf->data();
- writer_buf = new_IOBufferBlock(write_vc->first_buf, doc->data_len(), sizeofDoc + doc->hlen);
+ writer_buf = new_IOBufferBlock(write_vc->first_buf, doc->data_len(), doc->prefix_len());
MUTEX_RELEASE(writer_lock);
ink_assert(doc_len == doc->data_len());
length = doc_len;
- f.single_segment = 1;
- docpos = 0;
+ f.single_fragment = 1;
+ doc_pos = 0;
earliest_key = key;
dir_clean(&first_dir);
dir_clean(&earliest_dir);
SET_HANDLER(&CacheVC::openReadFromWriterMain);
CACHE_INCREMENT_DYN_STAT(cache_read_busy_success_stat);
- _action.continuation->handleEvent(CACHE_EVENT_OPEN_READ, (void *) this);
- return EVENT_DONE;
+ return callcont(CACHE_EVENT_OPEN_READ);
}
// want to snarf the new headers from the writer
// and then continue as if nothing happened
@@ -401,17 +413,17 @@
}
} else {
#endif //HTTP_CACHE
- Debug("cache_read_agg", "%x: key: %X non-http passed stage 1", this, first_key.word(1));
+ DDebug("cache_read_agg", "%x: key: %X non-http passed stage 1", this, first_key.word(1));
key = write_vc->earliest_key;
#ifdef HTTP_CACHE
}
#endif
- if (write_vc->segment) {
+ if (write_vc->fragment) {
doc_len = write_vc->vio.nbytes;
last_collision = NULL;
- Debug("cache_read_agg",
- "%x: key: %X closed: %d, segment: %d, len: %d starting first segment",
- this, first_key.word(1), write_vc->closed, write_vc->segment, doc_len);
+ DDebug("cache_read_agg",
+ "%x: key: %X closed: %d, fragment: %d, len: %d starting first fragment",
+ this, first_key.word(1), write_vc->closed, write_vc->fragment, (int)doc_len);
MUTEX_RELEASE(writer_lock);
// either a header + body update or a new document
SET_HANDLER(&CacheVC::openReadStartEarliest);
@@ -420,23 +432,19 @@
writer_buf = write_vc->blocks;
writer_offset = write_vc->offset;
length = write_vc->length;
- //copy the vector
- f.single_segment = !write_vc->segment; //single segment doc
- docpos = 0;
+ // copy the vector
+ f.single_fragment = !write_vc->fragment; // single fragment doc
+ doc_pos = 0;
earliest_key = write_vc->earliest_key;
ink_assert(earliest_key == key);
doc_len = write_vc->total_len;
dir_clean(&first_dir);
dir_clean(&earliest_dir);
- Debug("cache_read_agg", "%x key: %X %X: single segment read", first_key.word(1), key.word(0));
-
+ DDebug("cache_read_agg", "%x key: %X %X: single fragment read", first_key.word(1), key.word(0));
MUTEX_RELEASE(writer_lock);
- //we've got everything....ready to roll!!
SET_HANDLER(&CacheVC::openReadFromWriterMain);
CACHE_INCREMENT_DYN_STAT(cache_read_busy_success_stat);
- _action.continuation->handleEvent(CACHE_EVENT_OPEN_READ, (void *) this);
- return EVENT_DONE;
-
+ return callcont(CACHE_EVENT_OPEN_READ);
#endif //READ_WHILE_WRITER
}
@@ -446,57 +454,47 @@
NOWARN_UNUSED(e);
NOWARN_UNUSED(event);
- IOBufferBlock *b = NULL;
- int ntodo = vio.ntodo();
cancel_trigger();
+ if (seek_to) {
+ vio.ndone = seek_to;
+ seek_to = 0;
+ }
+ IOBufferBlock *b = NULL;
+ ink64 ntodo = vio.ntodo();
if (ntodo <= 0)
return EVENT_CONT;
- if (vio.buffer.mbuf->max_read_avail() > vio.buffer.writer()->water_mark && vio.ndone) // initiate read of first block
- return EVENT_CONT;
-
- if (length < (doc_len - vio.ndone)) {
- Debug("cache_read_agg", "truncation %X", earliest_key.word(0));
+ if (length < ((ink64)doc_len) - vio.ndone) {
+ DDebug("cache_read_agg", "truncation %X", earliest_key.word(0));
if (is_action_tag_set("cache")) {
ink_release_assert(false);
}
-// action_tag_assert("cache", false);
Warning("Document for %X truncated", earliest_key.word(0));
- calluser(VC_EVENT_ERROR);
- return EVENT_CONT;
+ return calluser(VC_EVENT_ERROR);
}
/* its possible that the user did a do_io_close before
openWriteWriteDone was called. */
- if (length > (doc_len - vio.ndone)) {
- int skip_bytes = length - (doc_len - vio.ndone);
+ if (length > ((ink64)doc_len) - vio.ndone) {
+ ink64 skip_bytes = length - (doc_len - vio.ndone);
iobufferblock_skip(writer_buf, &writer_offset, &length, skip_bytes);
}
- int bytes = length;
- if (length > vio.ntodo())
+ ink64 bytes = length;
+ if (bytes > vio.ntodo())
bytes = vio.ntodo();
-
- if (vio.ndone >= doc_len) {
+ if (vio.ndone >= (ink64)doc_len) {
ink_assert(bytes <= 0);
// reached the end of the document and the user still wants more
- calluser(VC_EVENT_EOS);
- return EVENT_DONE;
+ return calluser(VC_EVENT_EOS);
}
b = iobufferblock_clone(writer_buf, writer_offset, bytes);
writer_buf = iobufferblock_skip(writer_buf, &writer_offset, &length, bytes);
-
vio.buffer.mbuf->append_block(b);
vio.ndone += bytes;
- if (vio.ntodo() <= 0) {
- calluser(VC_EVENT_READ_COMPLETE);
- return EVENT_DONE;
- } else {
- if (calluser(VC_EVENT_READ_READY))
- return EVENT_DONE;
- else
- return EVENT_CONT;
- }
+ if (vio.ntodo() <= 0)
+ return calluser(VC_EVENT_READ_COMPLETE);
+ else
+ return calluser(VC_EVENT_READ_READY);
}
-
int
CacheVC::openReadClose(int event, Event * e)
{
@@ -509,12 +507,12 @@
return EVENT_CONT;
set_io_not_in_progress();
}
- MUTEX_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
+ CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
if (!lock)
VC_SCHED_LOCK_RETRY();
#ifdef HIT_EVACUATE
if (f.hit_evacuate && dir_valid(part, &first_dir) && closed > 0) {
- if (f.single_segment)
+ if (f.single_fragment)
part->force_evacuate_head(&first_dir, dir_pinned(&first_dir));
else if (dir_valid(part, &earliest_dir)) {
part->force_evacuate_head(&first_dir, dir_pinned(&first_dir));
@@ -535,68 +533,70 @@
if (event == EVENT_IMMEDIATE)
return EVENT_CONT;
set_io_not_in_progress();
- if (event == AIO_EVENT_DONE && !io.ok())
- goto Ldone;
- if (last_collision && // no missed lock
- dir_valid(part, &dir)) // object still valid
- {
- doc = (Doc *) buf->data();
- if (doc->magic != DOC_MAGIC) {
- char tmpstring[100];
- if (doc->magic == DOC_CORRUPT)
- Warning("Middle: Doc checksum does not match for %s", key.string(tmpstring));
- else
- Warning("Middle: Doc magic does not match for %s", key.string(tmpstring));
- goto Ldone;
- }
-
- if (doc->key == key) {
- docpos = sizeofDoc + doc->hlen;
- next_CacheKey(&key, &key);
- SET_HANDLER(&CacheVC::openReadMain);
- return openReadMain(event, e);
- }
- }
{
CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
if (!lock)
VC_SCHED_LOCK_RETRY();
-
+ if (event == AIO_EVENT_DONE && !io.ok()) {
+ dir_delete(&earliest_key, part, &earliest_dir);
+ goto Lerror;
+ }
+ if (last_collision && // no missed lock
+ dir_valid(part, &dir)) // object still valid
+ {
+ doc = (Doc *) buf->data();
+ if (doc->magic != DOC_MAGIC) {
+ char tmpstring[100];
+ if (doc->magic == DOC_CORRUPT)
+ Warning("Middle: Doc checksum does not match for %s", key.string(tmpstring));
+ else
+ Warning("Middle: Doc magic does not match for %s", key.string(tmpstring));
+ goto Lerror;
+ }
+ if (doc->key == key)
+ goto LreadMain;
+ }
if (last_collision && dir_offset(&dir) != dir_offset(last_collision))
last_collision = 0; // object has been/is being overwritten
if (dir_probe(&key, part, &dir, &last_collision)) {
- do_read(&key);
+ int ret = do_read_call(&key);
+ if (ret == EVENT_RETURN)
+ goto Lcallreturn;
return EVENT_CONT;
} else if (write_vc) {
if (writer_done()) {
last_collision = NULL;
while (dir_probe(&earliest_key, part, &dir, &last_collision)) {
if (dir_offset(&dir) == dir_offset(&earliest_dir)) {
- Debug("cache_read_agg", "%x: key: %X ReadRead complete: %d", this, first_key.word(1), vio.ndone);
+ DDebug("cache_read_agg", "%x: key: %X ReadRead complete: %d",
+ this, first_key.word(1), (int)vio.ndone);
doc_len = vio.ndone;
- calluser(VC_EVENT_EOS);
- return EVENT_DONE;
+ goto Ldone;
}
}
- Debug("cache_read_agg", "%x: key: %X ReadRead writer aborted: %d", this, first_key.word(1), vio.ndone);
- calluser(VC_EVENT_ERROR);
- return EVENT_DONE;
+ DDebug("cache_read_agg", "%x: key: %X ReadRead writer aborted: %d",
+ this, first_key.word(1), (int)vio.ndone);
+ goto Lerror;
}
- Debug("cache_read_agg", "%x: key: %X ReadRead retrying: %d", this, first_key.word(1), vio.ndone);
- VC_SCHED_WRITER_RETRY();
+ DDebug("cache_read_agg", "%x: key: %X ReadRead retrying: %d", this, first_key.word(1), (int)vio.ndone);
+ VC_SCHED_WRITER_RETRY(); // wait for writer
}
+ // fall through for truncated documents
}
-Ldone:
-// action_tag_assert("cache", false);
- if (is_action_tag_set("cache")) {
- ink_release_assert(false);
- }
- // remove the directory entry
- dir_delete_lock(&earliest_key, part, mutex, &earliest_dir);
+Lerror:
char tmpstring[100];
Warning("Document truncated for %s", earliest_key.string(tmpstring));
- calluser(VC_EVENT_ERROR);
- return EVENT_CONT;
+ return calluser(VC_EVENT_ERROR);
+Ldone:
+ return calluser(VC_EVENT_EOS);
+Lcallreturn:
+ return handleEvent(AIO_EVENT_DONE, 0);
+LreadMain:
+ fragment++;
+ doc_pos = doc->prefix_len();
+ next_CacheKey(&key, &key);
+ SET_HANDLER(&CacheVC::openReadMain);
+ return openReadMain(event, e);
}
int
@@ -605,30 +605,53 @@
NOWARN_UNUSED(e);
NOWARN_UNUSED(event);
- IOBufferBlock *b = NULL;
- int ntodo = vio.ntodo();
cancel_trigger();
+ Doc *doc = (Doc *) buf->data();
+ ink64 ntodo = vio.ntodo();
+ ink64 bytes = doc->len - doc_pos;
+ IOBufferBlock *b = NULL;
+ if (seek_to) { // handle do_io_pread
+ if (seek_to >= (int)doc_len) {
+ vio.ndone = doc_len;
+ return calluser(VC_EVENT_EOS);
+ }
+ if (f.single_fragment) {
+ vio.ndone = seek_to;
+ seek_to = 0;
+ } else {
+ if (seek_to < doc->data_len()) {
+ vio.ndone += seek_to;
+ seek_to = 0;
+ } else {
+ Frag *frag = doc->frags();
+ // skip to correct key, key is already set to next fragment
+ for (inku32 i = 1; i <= doc->nfrags(); i++) {
+ if (seek_to < (int)frag[i].offset) break;
+ next_CacheKey(&key, &key);
+ }
+ seek_to = 0;
+ vio.ndone = seek_to;
+ goto Lread;
+ }
+ }
+ }
if (ntodo <= 0)
return EVENT_CONT;
if (vio.buffer.mbuf->max_read_avail() > vio.buffer.writer()->water_mark && vio.ndone) // initiate read of first block
return EVENT_CONT;
- Doc *doc = (Doc *) buf->data();
- int bytes = doc->len - docpos;
- if ((bytes <= 0) && vio.ntodo() >= 0) {
+ if ((bytes <= 0) && vio.ntodo() >= 0)
goto Lread;
- }
if (bytes > vio.ntodo())
bytes = vio.ntodo();
- b = new_IOBufferBlock(buf, bytes, docpos);
+ b = new_IOBufferBlock(buf, bytes, doc_pos);
b->_buf_end = b->_end;
vio.buffer.mbuf->append_block(b);
vio.ndone += bytes;
- docpos += bytes;
- if (vio.ntodo() <= 0) {
- calluser(VC_EVENT_READ_COMPLETE);
- return EVENT_DONE;
- } else {
- if (calluser(VC_EVENT_READ_READY))
+ doc_pos += bytes;
+ if (vio.ntodo() <= 0)
+ return calluser(VC_EVENT_READ_COMPLETE);
+ else {
+ if (calluser(VC_EVENT_READ_READY) == EVENT_DONE)
return EVENT_DONE;
// we have to keep reading until we give the user all the
// bytes it wanted or we hit the watermark.
@@ -636,12 +659,10 @@
goto Lread;
return EVENT_CONT;
}
-Lread:{
- if (vio.ndone >= doc_len) {
+Lread: {
+ if ((inku32)vio.ndone >= doc_len)
// reached the end of the document and the user still wants more
- calluser(VC_EVENT_EOS);
- return EVENT_DONE;
- }
+ return calluser(VC_EVENT_EOS);
last_collision = 0;
writer_lock_retry = 0;
// if the state machine calls reenable on the callback from the cache,
@@ -651,132 +672,132 @@
cancel_trigger();
CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
if (!lock) {
- SET_HANDLER(&CacheVC::openReadReadDone);
+ SET_HANDLER(&CacheVC::openReadMain);
VC_SCHED_LOCK_RETRY();
}
if (dir_probe(&key, part, &dir, &last_collision)) {
SET_HANDLER(&CacheVC::openReadReadDone);
- do_read(&key);
+ int ret = do_read_call(&key);
+ if (ret == EVENT_RETURN)
+ goto Lcallreturn;
return EVENT_CONT;
} else if (write_vc) {
if (writer_done()) {
last_collision = NULL;
while (dir_probe(&earliest_key, part, &dir, &last_collision)) {
if (dir_offset(&dir) == dir_offset(&earliest_dir)) {
- Debug("cache_read_agg", "%x: key: %X ReadMain complete: %d", this, first_key.word(1), vio.ndone);
+ DDebug("cache_read_agg", "%x: key: %X ReadMain complete: %d",
+ this, first_key.word(1), (int)vio.ndone);
doc_len = vio.ndone;
- calluser(VC_EVENT_EOS);
- return EVENT_DONE;
+ goto Leos;
}
}
- Debug("cache_read_agg", "%x: key: %X ReadMain writer aborted: %d", this, first_key.word(1), vio.ndone);
- calluser(VC_EVENT_ERROR);
- return EVENT_DONE;
+ DDebug("cache_read_agg", "%x: key: %X ReadMain writer aborted: %d",
+ this, first_key.word(1), (int)vio.ndone);
+ goto Lerror;
}
- Debug("cache_read_agg", "%x: key: %X ReadMain retrying: %d", this, first_key.word(1), vio.ndone);
- SET_HANDLER(&CacheVC::openReadReadDone);
+ DDebug("cache_read_agg", "%x: key: %X ReadMain retrying: %d", this, first_key.word(1), (int)vio.ndone);
+ SET_HANDLER(&CacheVC::openReadMain);
VC_SCHED_WRITER_RETRY();
}
+ if (is_action_tag_set("cache"))
+ ink_release_assert(false);
+ Warning("Document for %X truncated", earliest_key.word(0));
+ // remove the directory entry
+ dir_delete(&earliest_key, part, &earliest_dir);
}
- Debug("cache_evac", "truncation %X", key.word(0));
-// action_tag_assert("cache", false);
- if (is_action_tag_set("cache")) {
- ink_release_assert(false);
- }
- Warning("Document for %X truncated", earliest_key.word(0));
- // remove the directory entry
- dir_delete_lock(&earliest_key, part, mutex, &earliest_dir);
- calluser(VC_EVENT_ERROR);
- return EVENT_CONT;
+Lerror:
+ return calluser(VC_EVENT_ERROR);
+Leos:
+ return calluser(VC_EVENT_EOS);
+Lcallreturn:
+ return handleEvent(AIO_EVENT_DONE, 0);
}
-
+/*
+ This code follows CacheVC::openReadStartHead closely,
+ if you change this you might have to change that.
+*/
int
CacheVC::openReadStartEarliest(int event, Event * e)
{
NOWARN_UNUSED(e);
NOWARN_UNUSED(event);
- intptr_t err = ECACHE_NO_DOC;
+ int ret = 0;
Doc *doc = NULL;
-
cancel_trigger();
set_io_not_in_progress();
if (_action.cancelled)
return free_CacheVC(this);
- if (!buf)
- goto Lcollision;
- if (!io.ok())
- goto Ldone;
- // an object needs to be outside the aggregation window in order to be
- // be evacuated as it is read
- if (!dir_agg_valid(part, &dir)) {
- // a directory entry which is nolonger valid may have been overwritten
- if (!dir_valid(part, &dir))
+ {
+ CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
+ if (!lock)
+ VC_SCHED_LOCK_RETRY();
+ if (!buf)
+ goto Lread;
+ if (!io.ok())
+ goto Ldone;
+ // an object needs to be outside the aggregation window in order to be
+ // be evacuated as it is read
+ if (!dir_agg_valid(part, &dir)) {
+ // a directory entry which is nolonger valid may have been overwritten
+ if (!dir_valid(part, &dir))
+ last_collision = NULL;
+ goto Lread;
+ }
+ doc = (Doc *) buf->data();
+ if (doc->magic != DOC_MAGIC) {
+ char tmpstring[100];
+ if (is_action_tag_set("cache")) {
+ ink_release_assert(false);
+ }
+ if (doc->magic == DOC_CORRUPT)
+ Warning("Earliest: Doc checksum does not match for %s", key.string(tmpstring));
+ else
+ Warning("Earliest : Doc magic does not match for %s", key.string(tmpstring));
+ // remove the dir entry
+ dir_delete(&key, part, &dir);
+ // try going through the directory entries again
+ // in case the dir entry we deleted doesnt correspond
+ // to the key we are looking for. This is possible
+ // because of directory collisions
last_collision = NULL;
- goto Lcollision;
- }
- doc = (Doc *) buf->data();
- if (doc->magic != DOC_MAGIC) {
- char tmpstring[100];
- //action_tag_assert("cache", false);
- if (is_action_tag_set("cache")) {
- ink_release_assert(false);
+ goto Lread;
}
-
- if (doc->magic == DOC_CORRUPT)
- Warning("Earliest: Doc checksum does not match for %s", key.string(tmpstring));
- else
- Warning("Earliest : Doc magic does not match for %s", key.string(tmpstring));
- // remove the dir entry
- dir_delete(&key, part, &dir);
- // try going through the directory entries again
- // in case the dir entry we deleted doesnt correspond
- // to the key we are looking for. This is possible
- // because of directory collisions
- last_collision = NULL;
- goto Lcollision;
- }
- if (!(doc->key == key))
- goto Lcollision;
- // success
- earliest_key = key;
- docpos = sizeofDoc + doc->hlen;
- next_CacheKey(&key, &doc->key);
- if (part->begin_read_lock(this) < 0)
- VC_SCHED_LOCK_RETRY();
-
+ if (!(doc->key == key)) // collisiion
+ goto Lread;
+ // success
+ earliest_key = key;
+ doc_pos = doc->prefix_len();
+ next_CacheKey(&key, &doc->key);
+ part->begin_read(this);
#ifdef HIT_EVACUATE
- if (part->within_hit_evacuate_window(&earliest_dir) &&
- (!cache_config_hit_evacuate_size_limit || doc_len <= cache_config_hit_evacuate_size_limit)) {
- Debug("cache_hit_evac", "dir: %d, write: %d, phase: %d",
- dir_offset(&earliest_dir), offset_to_part_offset(part, part->header->write_pos), part->header->phase);
- f.hit_evacuate = 1;
- }
+ if (part->within_hit_evacuate_window(&earliest_dir) &&
+ (!cache_config_hit_evacuate_size_limit || doc_len <= cache_config_hit_evacuate_size_limit)) {
+ DDebug("cache_hit_evac", "dir: %d, write: %d, phase: %d",
+ dir_offset(&earliest_dir), offset_to_part_offset(part, part->header->write_pos), part->header->phase);
+ f.hit_evacuate = 1;
+ }
#endif
- if (write_vc)
- CACHE_INCREMENT_DYN_STAT(cache_read_busy_success_stat);
-
- SET_HANDLER(&CacheVC::openReadMain);
- _action.continuation->handleEvent(CACHE_EVENT_OPEN_READ, (void *) this);
- return EVENT_DONE;
-
-Lcollision:{
- CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
- if (!lock)
- VC_SCHED_LOCK_RETRY();
- if (dir_probe(&key, part, &earliest_dir, &last_collision) || dir_lookaside_probe(&key, part, &earliest_dir, NULL)) {
+ goto Lsuccess;
+Lread:
+ if (dir_probe(&key, part, &earliest_dir, &last_collision) ||
+ dir_lookaside_probe(&key, part, &earliest_dir, NULL))
+ {
dir = earliest_dir;
- return do_read(&key);
+ if ((ret = do_read_call(&key)) == EVENT_RETURN)
+ goto Lcallreturn;
+ return ret;
}
// read has detected that alternate does not exist in the cache.
// rewrite the vector.
#ifdef HTTP_CACHE
- if (!f.read_from_writer_called && f.http_request) {
+ if (!f.read_from_writer_called && frag_type == CACHE_FRAG_TYPE_HTTP) {
// don't want any writers while we are evacuating the vector
if (!part->open_write(this, false, 1)) {
Doc *doc1 = (Doc *) first_buf->data();
- int len = write_vector->get_handles(doc1->hdr, doc1->hlen);
+ inku32 len = write_vector->get_handles(doc1->hdr(), doc1->hlen);
ink_assert(len == doc1->hlen && write_vector->count() > 0);
write_vector->remove(alternate_index, true);
// if the vector had one alternate, delete it's directory entry
@@ -813,22 +834,31 @@
od->move_resident_alt = 1;
od->single_doc_key = doc1->key;
dir_assign(&od->single_doc_dir, &dir);
- dir_set_tag(&od->single_doc_dir, od->single_doc_key.word(1));
+ dir_set_tag(&od->single_doc_dir, od->single_doc_key.word(2));
}
SET_HANDLER(&CacheVC::openReadVecWrite);
- return do_write();
+ if ((ret = do_write_call()) == EVENT_RETURN)
+ goto Lcallreturn;
+ return ret;
}
}
}
#endif
// open write failure - another writer, so don't modify the vector
+ Ldone:
+ if (od)
+ part->close_write(this);
}
-Ldone:
- if (od)
- part->close_write_lock(this);
CACHE_INCREMENT_DYN_STAT(cache_read_failure_stat);
- _action.continuation->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *) -err);
+ _action.continuation->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *) -ECACHE_NO_DOC);
return free_CacheVC(this);
+Lcallreturn:
+ return handleEvent(AIO_EVENT_DONE, 0); // hopefully a tail call
+Lsuccess:
+ if (write_vc)
+ CACHE_INCREMENT_DYN_STAT(cache_read_busy_success_stat);
+ SET_HANDLER(&CacheVC::openReadMain);
+ return callcont(CACHE_EVENT_OPEN_READ);
}
// create the directory entry after the vector has been evacuated
@@ -840,53 +870,59 @@
NOWARN_UNUSED(e);
NOWARN_UNUSED(event);
- ink_debug_assert(part->mutex->thread_holding == this_ethread());
- intptr_t err = ECACHE_ALT_MISS;
- ink_assert(event == AIO_EVENT_DONE);
+ cancel_trigger();
set_io_not_in_progress();
ink_assert(od);
od->writing_vec = 0;
if (_action.cancelled)
return openWriteCloseDir(EVENT_IMMEDIATE, 0);
-
- if (io.ok()) {
- ink_assert(f.evac_vector);
- ink_assert(f.http_request);
- ink_assert(!buf.m_ptr);
- f.evac_vector = false;
- last_collision = NULL;
- f.update = 0;
- alternate_index = CACHE_ALT_INDEX_DEFAULT;
- f.use_first_key = 0;
- vio.op = VIO::READ;
- dir_overwrite(&first_key, part, &dir, &od->first_dir);
- if (od->move_resident_alt) {
- dir_insert(&od->single_doc_key, part, &od->single_doc_dir);
- }
+ {
+ CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
+ if (!lock)
+ VC_SCHED_LOCK_RETRY();
+ if (io.ok()) {
+ ink_assert(f.evac_vector);
+ ink_assert(frag_type == CACHE_FRAG_TYPE_HTTP);
+ ink_assert(!buf.m_ptr);
+ f.evac_vector = false;
+ last_collision = NULL;
+ f.update = 0;
+ alternate_index = CACHE_ALT_INDEX_DEFAULT;
+ f.use_first_key = 0;
+ vio.op = VIO::READ;
+ dir_overwrite(&first_key, part, &dir, &od->first_dir);
+ if (od->move_resident_alt)
+ dir_insert(&od->single_doc_key, part, &od->single_doc_dir);
#ifdef FIXME_NONMODULAR
- int alt_ndx = HttpTransactCache::SelectFromAlternates(write_vector,
- &request, params);
+ int alt_ndx = HttpTransactCache::SelectFromAlternates(write_vector, &request, params);
#else
- int alt_ndx = 0;
+ int alt_ndx = 0;
#endif
- part->close_write(this);
- if (alt_ndx >= 0) {
- vector.clear();
- // we don't need to start all over again, since we already
- // have the vector in memory. But this is simpler and this
- // case is rare.
- SET_HANDLER(&CacheVC::openReadStartHead);
- return openReadStartHead(EVENT_IMMEDIATE, 0);
- }
- } else
- part->close_write(this);
+ part->close_write(this);
+ if (alt_ndx >= 0) {
+ vector.clear();
+ // we don't need to start all over again, since we already
+ // have the vector in memory. But this is simpler and this
+ // case is rare.
+ goto Lrestart;
+ }
+ } else
+ part->close_write(this);
+ }
CACHE_INCREMENT_DYN_STAT(cache_read_failure_stat);
- _action.continuation->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *) -err);
+ _action.continuation->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *) -ECACHE_ALT_MISS);
return free_CacheVC(this);
+Lrestart:
+ SET_HANDLER(&CacheVC::openReadStartHead);
+ return openReadStartHead(EVENT_IMMEDIATE, 0);
}
#endif
+/*
+ This code follows CacheVC::openReadStartEarliest closely,
+ if you change this you might have to change that.
+*/
int
CacheVC::openReadStartHead(int event, Event * e)
{
@@ -896,140 +932,127 @@
set_io_not_in_progress();
if (_action.cancelled)
return free_CacheVC(this);
- if (!buf)
- goto Lcollision;
- if (!io.ok())
- goto Ldone;
- // an object needs to be outside the aggregation window in order to be
- // be evacuated as it is read
- if (!dir_agg_valid(part, &dir)) {
- // a directory entry which is nolonger valid may have been overwritten
- if (!dir_valid(part, &dir))
+ {
+ CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
+ if (!lock)
+ VC_SCHED_LOCK_RETRY();
+ if (!buf)
+ goto Lread;
+ if (!io.ok())
+ goto Ldone;
+ // an object needs to be outside the aggregation window in order to be
+ // be evacuated as it is read
+ if (!dir_agg_valid(part, &dir)) {
+ // a directory entry which is nolonger valid may have been overwritten
+ if (!dir_valid(part, &dir))
+ last_collision = NULL;
+ goto Lread;
+ }
+ doc = (Doc *) buf->data();
+ if (doc->magic != DOC_MAGIC) {
+ char tmpstring[100];
+ if (is_action_tag_set("cache")) {
+ ink_release_assert(false);
+ }
+ if (doc->magic == DOC_CORRUPT)
+ Warning("Head: Doc checksum does not match for %s", key.string(tmpstring));
+ else
+ Warning("Head : Doc magic does not match for %s", key.string(tmpstring));
+ // remove the dir entry
+ dir_delete(&key, part, &dir);
+ // try going through the directory entries again
+ // in case the dir entry we deleted doesnt correspond
+ // to the key we are looking for. This is possible
+ // because of directory collisions
last_collision = NULL;
- goto Lcollision;
- }
- doc = (Doc *) buf->data();
- if (doc->magic != DOC_MAGIC) {
- char tmpstring[100];
- //action_tag_assert("cache", false);
- if (is_action_tag_set("cache")) {
- ink_release_assert(false);
+ goto Lread;
}
- if (doc->magic == DOC_CORRUPT)
- Warning("Head: Doc checksum does not match for %s", key.string(tmpstring));
- else
- Warning("Head : Doc magic does not match for %s", key.string(tmpstring));
- //remove the directory entry
- dir_delete(&key, part, &dir);
- // try going through the directory entries again
- // in case the dir entry we deleted doesnt correspond
- // to the key we are looking for. This is possible
- // because of directory collisions
- last_collision = NULL;
- goto Lcollision;
- }
- if (!(doc->first_key == key))
- goto Lcollision;
-
-
- if (f.lookup) {
- CACHE_INCREMENT_DYN_STAT(cache_lookup_success_stat);
- _action.continuation->handleEvent(CACHE_EVENT_LOOKUP, 0);
- return free_CacheVC(this);
- }
- earliest_dir = dir;
+ if (!(doc->first_key == key))
+ goto Lread;
+ if (f.lookup)
+ goto Lookup;
+ earliest_dir = dir;
#ifdef HTTP_CACHE
- CacheHTTPInfo *alternate_tmp;
- if (f.http_request) {
- ink_assert(doc->hlen);
- if (!doc->hlen)
- goto Ldone;
- if (vector.get_handles(doc->hdr, doc->hlen) != doc->hlen) {
- if (buf) {
- Note("OpenReadHead failed for cachekey %X : vector inconsistency with %d", key.word(0), doc->hlen);
- dir_delete(&key, part, &dir);
+ CacheHTTPInfo *alternate_tmp;
+ if (frag_type == CACHE_FRAG_TYPE_HTTP) {
+ ink_assert(doc->hlen);
+ if (!doc->hlen)
+ goto Ldone;
+ if (vector.get_handles(doc->hdr(), doc->hlen) != doc->hlen) {
+ if (buf) {
+ Note("OpenReadHead failed for cachekey %X : vector inconsistency with %d", key.word(0), doc->hlen);
+ dir_delete(&key, part, &dir);
+ }
+ err = ECACHE_BAD_META_DATA;
+ goto Ldone;
}
- err = ECACHE_BAD_META_DATA;
- goto Ldone;
- }
- if (cache_config_select_alternate) {
+ if (cache_config_select_alternate) {
#ifdef FIXME_NONMODULAR
- alternate_index = HttpTransactCache::SelectFromAlternates(&vector, &request, params);
+ alternate_index = HttpTransactCache::SelectFromAlternates(&vector, &request, params);
#else
- alternate_index = 0;
+ alternate_index = 0;
#endif
- if (alternate_index < 0) {
- err = ECACHE_ALT_MISS;
+ if (alternate_index < 0) {
+ err = ECACHE_ALT_MISS;
+ goto Ldone;
+ }
+ } else
+ alternate_index = 0;
+ alternate_tmp = vector.get(alternate_index);
+ if (!alternate_tmp->valid()) {
+ if (buf) {
+ Note("OpenReadHead failed for cachekey %X : alternate inconsistency", key.word(0));
+ dir_delete(&key, part, &dir);
+ }
goto Ldone;
}
+
+ alternate.copy_shallow(alternate_tmp);
+ alternate.object_key_get(&key);
+ doc_len = alternate.object_size_get();
+ if (key == doc->key) { // is this my data?
+ f.single_fragment = doc->single_fragment();
+ ink_assert(f.single_fragment); // otherwise need to read earliest
+ ink_assert(doc->hlen);
+ doc_pos = doc->prefix_len();
+ next_CacheKey(&key, &doc->key);
+ } else {
+ f.single_fragment = false;
+ }
} else
- alternate_index = 0;
- alternate_tmp = vector.get(alternate_index);
- if (!alternate_tmp->valid()) {
- if (buf) {
- Note("OpenReadHead failed for cachekey %X : alternate inconsistency", key.word(0));
- dir_delete(&key, part, &dir);
+#endif
+ {
+ // non-http docs have the total len set in the first fragment
+ if (doc->hlen) {
+ ink_debug_assert(!"Cache::openReadStartHead non-http request" " for http doc");
+ err = -ECACHE_BAD_READ_REQUEST;
+ goto Ldone;
}
- goto Ldone;
- }
-
- alternate.copy_shallow(alternate_tmp);
- alternate.object_key_get(&key);
- doc_len = alternate.object_size_get();
- if (key == doc->key) { // is this my data?
- f.single_segment = doc->single_segment();
- ink_assert(f.single_segment); // otherwise need to read earliest
- ink_assert(doc->hlen);
- docpos = sizeofDoc + doc->hlen;
next_CacheKey(&key, &doc->key);
- } else {
- f.single_segment = false;
+ f.single_fragment = doc->single_fragment();
+ doc_pos = doc->prefix_len();
+ doc_len = doc->total_len;
+ }
+ // the first fragment might have been gc'ed. Make sure the first
+ // fragment is there before returning CACHE_EVENT_OPEN_READ
+ if (!f.single_fragment)
+ goto Learliest;
+
+#ifdef HIT_EVACUATE
+ if (part->within_hit_evacuate_window(&dir) &&
+ (!cache_config_hit_evacuate_size_limit || doc_len <= cache_config_hit_evacuate_size_limit)) {
+ DDebug("cache_hit_evac", "dir: %d, write: %d, phase: %d",
+ dir_offset(&dir), offset_to_part_offset(part, part->header->write_pos), part->header->phase);
+ f.hit_evacuate = 1;
}
- } else
#endif
- {
- // non-http docs have the total len set in the first segment
- if (doc->hlen) {
- ink_debug_assert(!"Cache::openReadStartHead non-http request" " for http doc");
- err = -ECACHE_BAD_READ_REQUEST;
- goto Ldone;
- }
- next_CacheKey(&key, &doc->key);
- f.single_segment = doc->single_segment();
- docpos = sizeofDoc + doc->hlen;
- doc_len = doc->total_len;
- }
- // the first fragment might have been gc'ed. Make sure the first
- // fragment is there before returning CACHE_EVENT_OPEN_READ
- if (!f.single_segment) {
+
first_buf = buf;
- buf = NULL;
- earliest_key = key;
- last_collision = NULL;
- SET_HANDLER(&CacheVC::openReadStartEarliest);
- return openReadStartEarliest(event, e);
- }
-#ifdef HIT_EVACUATE
- if (part->within_hit_evacuate_window(&dir) &&
- (!cache_config_hit_evacuate_size_limit || doc_len <= cache_config_hit_evacuate_size_limit)) {
- Debug("cache_hit_evac", "dir: %d, write: %d, phase: %d",
- dir_offset(&dir), offset_to_part_offset(part, part->header->write_pos), part->header->phase);
- f.hit_evacuate = 1;
- }
-#endif
+ part->begin_read(this);
- first_buf = buf;
- if (part->begin_read_lock(this) < 0)
- VC_SCHED_LOCK_RETRY();
- // success
- SET_HANDLER(&CacheVC::openReadMain);
- _action.continuation->handleEvent(CACHE_EVENT_OPEN_READ, (void *) this);
- return EVENT_DONE;
+ goto Lsuccess;
-Lcollision:{
- CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
- if (!lock)
- VC_SCHED_LOCK_RETRY();
+ Lread:
// check for collision
// INKqa07684 - Cache::lookup returns CACHE_EVENT_OPEN_READ_FAILED.
// don't want to go through this BS of reading from a writer if
@@ -1042,12 +1065,16 @@
goto Ldone;
}
od = cod;
+ MUTEX_RELEASE(lock);
SET_HANDLER(&CacheVC::openReadFromWriter);
return handleEvent(EVENT_IMMEDIATE, 0);
}
if (dir_probe(&key, part, &dir, &last_collision)) {
first_dir = dir;
- return do_read(&key);
+ int ret = do_read_call(&key);
+ if (ret == EVENT_RETURN)
+ goto Lcallreturn;
+ return ret;
}
}
Ldone:
@@ -1059,4 +1086,20 @@
_action.continuation->handleEvent(CACHE_EVENT_LOOKUP_FAILED, (void *) -err);
}
return free_CacheVC(this);
+Lcallreturn:
+ return handleEvent(AIO_EVENT_DONE, 0); // hopefully a tail call
+Lsuccess:
+ SET_HANDLER(&CacheVC::openReadMain);
+ return callcont(CACHE_EVENT_OPEN_READ);
+Lookup:
+ CACHE_INCREMENT_DYN_STAT(cache_lookup_success_stat);
+ _action.continuation->handleEvent(CACHE_EVENT_LOOKUP, 0);
+ return free_CacheVC(this);
+Learliest:
+ first_buf = buf;
+ buf = NULL;
+ earliest_key = key;
+ last_collision = NULL;
+ SET_HANDLER(&CacheVC::openReadStartEarliest);
+ return openReadStartEarliest(event, e);
}