You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by jp...@apache.org on 2009/12/18 17:48:16 UTC
svn commit: r892310 [2/4] - in
/incubator/trafficserver/traffic/branches/dev: ./ iocore/aio/ iocore/cache/
iocore/cluster/ iocore/eventsystem/ iocore/net/ libinktomi++/ proxy/
proxy/http2/
Modified: incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheRead.cc
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheRead.cc?rev=892310&r1=892309&r2=892310&view=diff
==============================================================================
--- incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheRead.cc (original)
+++ incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheRead.cc Fri Dec 18 16:47:37 2009
@@ -29,60 +29,59 @@
Action *
Cache::open_read(Continuation * cont, CacheKey * key, CacheFragType type, char *hostname, int host_len)
{
-
if (!(CacheProcessor::cache_ready & type)) {
cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *) -ECACHE_NOT_READY);
return ACTION_RESULT_DONE;
}
-
ink_assert(caches[type] == this);
Part *part = key_to_part(key, hostname, host_len);
Dir result, *last_collision = NULL;
ProxyMutex *mutex = cont->mutex;
-
- CacheVC *c = new_CacheVC(cont);
- SET_CONTINUATION_HANDLER(c, &CacheVC::openReadStartHead);
- c->vio.op = VIO::READ;
- c->base_stat = cache_read_active_stat;
- CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
- c->first_key = c->key = c->earliest_key = *key;
- c->part = part;
- if (type == CACHE_FRAG_TYPE_HTTP) {
- c->f.http_request = 1;
- }
-
- CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
- if (lock) {
- c->od = part->open_read(key);
- if (c->od) {
- // someone is currently writing the document
- SET_CONTINUATION_HANDLER(c, &CacheVC::openReadFromWriter);
- if (c->handleEvent(EVENT_IMMEDIATE, 0) == EVENT_CONT)
- return &c->_action;
- else
- return ACTION_RESULT_DONE;
+ OpenDirEntry *od = NULL;
+ CacheVC *c = NULL;
+ {
+ CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
+ if (!lock || (od = part->open_read(key)) || dir_probe(key, part, &result, &last_collision)) {
+ c = new_CacheVC(cont);
+ SET_CONTINUATION_HANDLER(c, &CacheVC::openReadStartHead);
+ c->vio.op = VIO::READ;
+ c->base_stat = cache_read_active_stat;
+ CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
+ c->first_key = c->key = c->earliest_key = *key;
+ c->part = part;
+ c->frag_type = type;
+ c->od = od;
}
- // no writer
- if (!dir_probe(key, part, &result, &last_collision)) {
- //release the lock before calling handleEvent??
- CACHE_INCREMENT_DYN_STAT(cache_read_failure_stat);
- cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *) -ECACHE_NO_DOC);
- free_CacheVC(c);
- return ACTION_RESULT_DONE;
+ if (!lock) {
+ CONT_SCHED_LOCK_RETRY(c);
+ return &c->_action;
}
-
+ if (!c)
+ goto Lmiss;
+ if (c->od)
+ goto Lwriter;
c->dir = result;
c->last_collision = last_collision;
+ switch(c->do_read_call(&c->key)) {
+ case EVENT_DONE: return ACTION_RESULT_DONE;
+ case EVENT_RETURN: goto Lcallreturn;
+ default: return &c->_action;
+ }
}
- if (!lock) {
- CONT_SCHED_LOCK_RETRY(c);
- return &c->_action;
- }
- if (c->do_read(&c->key) == EVENT_CONT)
- return &c->_action;
- else
- return ACTION_RESULT_DONE; // ram cache hit
+Lmiss:
+ CACHE_INCREMENT_DYN_STAT(cache_read_failure_stat);
+ cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *) -ECACHE_NO_DOC);
+ return ACTION_RESULT_DONE;
+Lwriter:
+ SET_CONTINUATION_HANDLER(c, &CacheVC::openReadFromWriter);
+ if (c->handleEvent(EVENT_IMMEDIATE, 0) == EVENT_DONE)
+ return ACTION_RESULT_DONE;
+ return &c->_action;
+Lcallreturn:
+ if (c->handleEvent(AIO_EVENT_DONE, 0) == EVENT_DONE)
+ return ACTION_RESULT_DONE;
+ return &c->_action;
}
#ifdef HTTP_CACHE
@@ -95,59 +94,62 @@
cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *) -ECACHE_NOT_READY);
return ACTION_RESULT_DONE;
}
-
ink_assert(caches[type] == this);
Part *part = key_to_part(key, hostname, host_len);
Dir result, *last_collision = NULL;
ProxyMutex *mutex = cont->mutex;
+ OpenDirEntry *od = NULL;
+ CacheVC *c = NULL;
- CacheVC *c = new_CacheVC(cont);
- c->first_key = c->key = c->earliest_key = *key;
- c->part = part;
- c->vio.op = VIO::READ;
- c->base_stat = cache_read_active_stat;
- CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
- c->request.copy_shallow(request);
- c->f.http_request = 1;
- c->params = params;
-
- CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
- if (lock) {
- c->od = part->open_read(key);
- if (c->od) {
- //Type-casting the continuation to HttpCacheSM object
- HttpCacheSM *cachesm = (HttpCacheSM *) cont;
- //Setting the read_while_write_inprogress flag
- cachesm->set_readwhilewrite_inprogress(true);
-
- // someone is currently writing the document
- SET_CONTINUATION_HANDLER(c, &CacheVC::openReadFromWriter);
- if (c->handleEvent(EVENT_IMMEDIATE, 0) == EVENT_CONT)
- return &c->_action;
- else
- return ACTION_RESULT_DONE;
- }
- if (!dir_probe(key, part, &result, &last_collision)) {
- //release the lock before calling handleEvent???
- CACHE_INCREMENT_DYN_STAT(cache_read_failure_stat);
- cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *) -ECACHE_NO_DOC);
- free_CacheVC(c);
- return ACTION_RESULT_DONE;
+ {
+ CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
+ if (!lock || (od = part->open_read(key)) || dir_probe(key, part, &result, &last_collision)) {
+ c = new_CacheVC(cont);
+ c->first_key = c->key = c->earliest_key = *key;
+ c->part = part;
+ c->vio.op = VIO::READ;
+ c->base_stat = cache_read_active_stat;
+ CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
+ c->request.copy_shallow(request);
+ c->frag_type = CACHE_FRAG_TYPE_HTTP;
+ c->params = params;
+ c->od = od;
}
+ if (!lock) {
+ SET_CONTINUATION_HANDLER(c, &CacheVC::openReadStartHead);
+ CONT_SCHED_LOCK_RETRY(c);
+ return &c->_action;
+ }
+ if (!c)
+ goto Lmiss;
+ if (c->od)
+ goto Lwriter;
+ // hit
c->dir = c->first_dir = result;
c->last_collision = last_collision;
-
+ SET_CONTINUATION_HANDLER(c, &CacheVC::openReadStartHead);
+ switch(c->do_read_call(&c->key)) {
+ case EVENT_DONE: return ACTION_RESULT_DONE;
+ case EVENT_RETURN: goto Lcallreturn;
+ default: return &c->_action;
+ }
}
- SET_CONTINUATION_HANDLER(c, &CacheVC::openReadStartHead);
- if (!lock) {
- CONT_SCHED_LOCK_RETRY(c);
- return &c->_action;
- }
- if (c->do_read(&c->key) == EVENT_CONT)
- return &c->_action;
- else
- return ACTION_RESULT_DONE; // ram cache hit
+Lmiss:
+ CACHE_INCREMENT_DYN_STAT(cache_read_failure_stat);
+ cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *) -ECACHE_NO_DOC);
+ return ACTION_RESULT_DONE;
+Lwriter:
+ // this is a horrible violation of the interface and should be fixed (FIXME)
+ ((HttpCacheSM *)cont)->set_readwhilewrite_inprogress(true);
+ SET_CONTINUATION_HANDLER(c, &CacheVC::openReadFromWriter);
+ if (c->handleEvent(EVENT_IMMEDIATE, 0) == EVENT_DONE)
+ return ACTION_RESULT_DONE;
+ return &c->_action;
+Lcallreturn:
+ if (c->handleEvent(AIO_EVENT_DONE, 0) == EVENT_DONE)
+ return ACTION_RESULT_DONE;
+ return &c->_action;
}
#endif
@@ -173,7 +175,7 @@
int err = ECACHE_DOC_BUSY;
CacheVC *w = NULL;
- if (!f.http_request) {
+ if (frag_type != CACHE_FRAG_TYPE_HTTP) {
ink_assert(od->num_writers == 1);
w = od->writers.head;
if (w->start_time > start_time || w->closed < 0) {
@@ -254,7 +256,7 @@
}
vector.clear(false);
if (!write_vc) {
- Debug("cache_read_agg", "%x: key: %X writer alternate different: %d", this, first_key.word(1), alternate_index);
+ Debug("cache_read_agg", "%x: key: %X %X writer alternate different: %d", this, first_key.word(1), alternate_index);
od = NULL;
SET_HANDLER(&CacheVC::openReadStartHead);
return openReadStartHead(EVENT_IMMEDIATE, 0);
@@ -293,10 +295,8 @@
if (_action.cancelled)
return free_CacheVC(this);
CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
- if (!lock) {
- ink_assert(!od);
+ if (!lock)
VC_SCHED_LOCK_RETRY();
- }
if (!od) {
od = part->open_read(&first_key);
if (!od) {
@@ -306,7 +306,6 @@
}
} else
ink_debug_assert(od == part->open_read(&first_key));
-
if (!write_vc) {
int ret = openReadChooseWriter(event, e);
if (ret == EVENT_DONE || !write_vc)
@@ -331,13 +330,13 @@
return openReadStartHead(EVENT_IMMEDIATE, 0);
}
// allow reading from unclosed writer for http requests only.
- ink_assert(f.http_request || write_vc->closed);
- if (!write_vc->closed && !write_vc->segment) {
- if (!cache_config_read_while_writer || !f.http_request)
+ ink_assert(frag_type == CACHE_FRAG_TYPE_HTTP || write_vc->closed);
+ if (!write_vc->closed && !write_vc->fragment) {
+ if (!cache_config_read_while_writer || frag_type != CACHE_FRAG_TYPE_HTTP)
return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, (Event *) - err);
Debug("cache_read_agg",
- "%x: key: %X writer: closed:%d, segment:%d, retry: %d",
- this, first_key.word(1), write_vc->closed, write_vc->segment, writer_lock_retry);
+ "%x: key: %X writer: closed:%d, fragment:%d, retry: %d",
+ this, first_key.word(1), write_vc->closed, write_vc->fragment, writer_lock_retry);
VC_SCHED_WRITER_RETRY();
}
@@ -350,10 +349,10 @@
if (!write_vc->io.ok())
return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, (Event *) - err);
#ifdef HTTP_CACHE
- if (f.http_request) {
+ if (frag_type == CACHE_FRAG_TYPE_HTTP) {
Debug("cache_read_agg",
- "%x: key: %X http passed stage 1, closed: %d, seg: %d",
- this, first_key.word(1), write_vc->closed, write_vc->segment);
+ "%x: key: %X http passed stage 1, closed: %d, frag: %d",
+ this, first_key.word(1), write_vc->closed, write_vc->fragment);
if (!write_vc->alternate.valid())
return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, (Event *) - err);
alternate.copy(&write_vc->alternate);
@@ -378,11 +377,11 @@
// header only update. The first_buf of the writer has the
// document body.
Doc *doc = (Doc *) write_vc->first_buf->data();
- writer_buf = new_IOBufferBlock(write_vc->first_buf, doc->data_len(), sizeofDoc + doc->hlen);
+ writer_buf = new_IOBufferBlock(write_vc->first_buf, doc->data_len(), doc->prefix_len());
MUTEX_RELEASE(writer_lock);
ink_assert(doc_len == doc->data_len());
length = doc_len;
- f.single_segment = 1;
+ f.single_fragment = 1;
docpos = 0;
earliest_key = key;
dir_clean(&first_dir);
@@ -406,12 +405,12 @@
#ifdef HTTP_CACHE
}
#endif
- if (write_vc->segment) {
+ if (write_vc->fragment) {
doc_len = write_vc->vio.nbytes;
last_collision = NULL;
Debug("cache_read_agg",
- "%x: key: %X closed: %d, segment: %d, len: %d starting first segment",
- this, first_key.word(1), write_vc->closed, write_vc->segment, doc_len);
+ "%x: key: %X closed: %d, fragment: %d, len: %d starting first fragment",
+ this, first_key.word(1), write_vc->closed, write_vc->fragment, (int)doc_len);
MUTEX_RELEASE(writer_lock);
// either a header + body update or a new document
SET_HANDLER(&CacheVC::openReadStartEarliest);
@@ -421,14 +420,14 @@
writer_offset = write_vc->offset;
length = write_vc->length;
//copy the vector
- f.single_segment = !write_vc->segment; //single segment doc
+ f.single_fragment = !write_vc->fragment; //single fragment doc
docpos = 0;
earliest_key = write_vc->earliest_key;
ink_assert(earliest_key == key);
doc_len = write_vc->total_len;
dir_clean(&first_dir);
dir_clean(&earliest_dir);
- Debug("cache_read_agg", "%x key: %X %X: single segment read", first_key.word(1), key.word(0));
+ Debug("cache_read_agg", "%x key: %X %X: single fragment read", first_key.word(1), key.word(0));
MUTEX_RELEASE(writer_lock);
//we've got everything....ready to roll!!
@@ -446,20 +445,20 @@
NOWARN_UNUSED(e);
NOWARN_UNUSED(event);
- IOBufferBlock *b = NULL;
- int ntodo = vio.ntodo();
cancel_trigger();
+ if (seek_to) {
+ vio.ndone = seek_to;
+ seek_to = 0;
+ }
+ IOBufferBlock *b = NULL;
+ ink64 ntodo = vio.ntodo();
if (ntodo <= 0)
return EVENT_CONT;
- if (vio.buffer.mbuf->max_read_avail() > vio.buffer.writer()->water_mark && vio.ndone) // initiate read of first block
- return EVENT_CONT;
-
if (length < (doc_len - vio.ndone)) {
Debug("cache_read_agg", "truncation %X", earliest_key.word(0));
if (is_action_tag_set("cache")) {
ink_release_assert(false);
}
-// action_tag_assert("cache", false);
Warning("Document for %X truncated", earliest_key.word(0));
calluser(VC_EVENT_ERROR);
return EVENT_CONT;
@@ -467,22 +466,20 @@
/* its possible that the user did a do_io_close before
openWriteWriteDone was called. */
if (length > (doc_len - vio.ndone)) {
- int skip_bytes = length - (doc_len - vio.ndone);
- iobufferblock_skip(writer_buf, &writer_offset, &length, skip_bytes);
+ ink64 skip_bytes = length - (doc_len - vio.ndone);
+ iobufferblock_skip(writer_buf, &writer_offset, (int*)&length, skip_bytes);
}
- int bytes = length;
- if (length > vio.ntodo())
+ ink64 bytes = length;
+ if (bytes > vio.ntodo())
bytes = vio.ntodo();
-
- if (vio.ndone >= doc_len) {
+ if (vio.ndone >= (ink64)doc_len) {
ink_assert(bytes <= 0);
// reached the end of the document and the user still wants more
calluser(VC_EVENT_EOS);
return EVENT_DONE;
}
b = iobufferblock_clone(writer_buf, writer_offset, bytes);
- writer_buf = iobufferblock_skip(writer_buf, &writer_offset, &length, bytes);
-
+ writer_buf = iobufferblock_skip(writer_buf, &writer_offset, (int*)&length, bytes);
vio.buffer.mbuf->append_block(b);
vio.ndone += bytes;
if (vio.ntodo() <= 0) {
@@ -496,7 +493,6 @@
}
}
-
int
CacheVC::openReadClose(int event, Event * e)
{
@@ -509,12 +505,12 @@
return EVENT_CONT;
set_io_not_in_progress();
}
- MUTEX_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
+ CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
if (!lock)
VC_SCHED_LOCK_RETRY();
#ifdef HIT_EVACUATE
if (f.hit_evacuate && dir_valid(part, &first_dir) && closed > 0) {
- if (f.single_segment)
+ if (f.single_fragment)
part->force_evacuate_head(&first_dir, dir_pinned(&first_dir));
else if (dir_valid(part, &earliest_dir)) {
part->force_evacuate_head(&first_dir, dir_pinned(&first_dir));
@@ -536,7 +532,7 @@
return EVENT_CONT;
set_io_not_in_progress();
if (event == AIO_EVENT_DONE && !io.ok())
- goto Ldone;
+ goto Lerror;
if (last_collision && // no missed lock
dir_valid(part, &dir)) // object still valid
{
@@ -547,11 +543,11 @@
Warning("Middle: Doc checksum does not match for %s", key.string(tmpstring));
else
Warning("Middle: Doc magic does not match for %s", key.string(tmpstring));
- goto Ldone;
+ goto Lerror;
}
-
if (doc->key == key) {
- docpos = sizeofDoc + doc->hlen;
+ fragment++;
+ docpos = doc->prefix_len();
next_CacheKey(&key, &key);
SET_HANDLER(&CacheVC::openReadMain);
return openReadMain(event, e);
@@ -561,42 +557,48 @@
CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
if (!lock)
VC_SCHED_LOCK_RETRY();
-
if (last_collision && dir_offset(&dir) != dir_offset(last_collision))
last_collision = 0; // object has been/is being overwritten
if (dir_probe(&key, part, &dir, &last_collision)) {
- do_read(&key);
+ int ret = do_read_call(&key);
+ if (ret == EVENT_RETURN)
+ goto Lcallreturn;
return EVENT_CONT;
} else if (write_vc) {
if (writer_done()) {
last_collision = NULL;
while (dir_probe(&earliest_key, part, &dir, &last_collision)) {
if (dir_offset(&dir) == dir_offset(&earliest_dir)) {
- Debug("cache_read_agg", "%x: key: %X ReadRead complete: %d", this, first_key.word(1), vio.ndone);
+ Debug("cache_read_agg", "%x: key: %X ReadRead complete: %d",
+ this, first_key.word(1), (int)vio.ndone);
doc_len = vio.ndone;
+ MUTEX_RELEASE(lock);
calluser(VC_EVENT_EOS);
return EVENT_DONE;
}
}
- Debug("cache_read_agg", "%x: key: %X ReadRead writer aborted: %d", this, first_key.word(1), vio.ndone);
+ Debug("cache_read_agg", "%x: key: %X ReadRead writer aborted: %d",
+ this, first_key.word(1), (int)vio.ndone);
+ MUTEX_RELEASE(lock);
calluser(VC_EVENT_ERROR);
return EVENT_DONE;
}
- Debug("cache_read_agg", "%x: key: %X ReadRead retrying: %d", this, first_key.word(1), vio.ndone);
+ Debug("cache_read_agg", "%x: key: %X ReadRead retrying: %d", this, first_key.word(1), (int)vio.ndone);
VC_SCHED_WRITER_RETRY();
}
}
-Ldone:
-// action_tag_assert("cache", false);
- if (is_action_tag_set("cache")) {
+Lerror:
+ if (is_action_tag_set("cache"))
ink_release_assert(false);
- }
// remove the directory entry
dir_delete_lock(&earliest_key, part, mutex, &earliest_dir);
char tmpstring[100];
Warning("Document truncated for %s", earliest_key.string(tmpstring));
calluser(VC_EVENT_ERROR);
return EVENT_CONT;
+Lcallreturn:
+ handleEvent(AIO_EVENT_DONE, 0);
+ return EVENT_CONT;
}
int
@@ -605,18 +607,43 @@
NOWARN_UNUSED(e);
NOWARN_UNUSED(event);
- IOBufferBlock *b = NULL;
- int ntodo = vio.ntodo();
cancel_trigger();
+ Doc *doc = (Doc *) buf->data();
+ ink64 ntodo = vio.ntodo();
+ ink64 bytes = doc->len - docpos;
+ IOBufferBlock *b = NULL;
+ if (seek_to) { // handle do_io_pread
+ if (seek_to >= doc_len) {
+ vio.ndone = doc_len;
+ calluser(VC_EVENT_EOS);
+ return EVENT_DONE;
+ }
+ if (f.single_fragment) {
+ vio.ndone = seek_to;
+ seek_to = 0;
+ } else {
+ if (seek_to < doc->data_len()) {
+ vio.ndone += seek_to;
+ seek_to = 0;
+ } else {
+ Frag *frag = doc->frags();
+ // skip to correct key, key is already set to next fragment
+ for (inku32 i = 1; i <= doc->nfrags(); i++) {
+ if (seek_to < frag[i].offset) break;
+ next_CacheKey(&key, &key);
+ }
+ seek_to = 0;
+ vio.ndone = seek_to;
+ goto Lread;
+ }
+ }
+ }
if (ntodo <= 0)
return EVENT_CONT;
if (vio.buffer.mbuf->max_read_avail() > vio.buffer.writer()->water_mark && vio.ndone) // initiate read of first block
return EVENT_CONT;
- Doc *doc = (Doc *) buf->data();
- int bytes = doc->len - docpos;
- if ((bytes <= 0) && vio.ntodo() >= 0) {
+ if ((bytes <= 0) && vio.ntodo() >= 0)
goto Lread;
- }
if (bytes > vio.ntodo())
bytes = vio.ntodo();
b = new_IOBufferBlock(buf, bytes, docpos);
@@ -636,8 +663,8 @@
goto Lread;
return EVENT_CONT;
}
-Lread:{
- if (vio.ndone >= doc_len) {
+Lread: {
+ if ((inku32)vio.ndone >= doc_len) {
// reached the end of the document and the user still wants more
calluser(VC_EVENT_EOS);
return EVENT_DONE;
@@ -656,50 +683,54 @@
}
if (dir_probe(&key, part, &dir, &last_collision)) {
SET_HANDLER(&CacheVC::openReadReadDone);
- do_read(&key);
+ int ret = do_read_call(&key);
+ if (ret == EVENT_RETURN)
+ goto Lcallreturn;
return EVENT_CONT;
} else if (write_vc) {
if (writer_done()) {
last_collision = NULL;
while (dir_probe(&earliest_key, part, &dir, &last_collision)) {
if (dir_offset(&dir) == dir_offset(&earliest_dir)) {
- Debug("cache_read_agg", "%x: key: %X ReadMain complete: %d", this, first_key.word(1), vio.ndone);
+ Debug("cache_read_agg", "%x: key: %X ReadMain complete: %d",
+ this, first_key.word(1), (int)vio.ndone);
doc_len = vio.ndone;
+ MUTEX_RELEASE(lock);
calluser(VC_EVENT_EOS);
return EVENT_DONE;
}
}
- Debug("cache_read_agg", "%x: key: %X ReadMain writer aborted: %d", this, first_key.word(1), vio.ndone);
- calluser(VC_EVENT_ERROR);
- return EVENT_DONE;
+ Debug("cache_read_agg", "%x: key: %X ReadMain writer aborted: %d",
+ this, first_key.word(1), (int)vio.ndone);
+ goto Lerror;
}
- Debug("cache_read_agg", "%x: key: %X ReadMain retrying: %d", this, first_key.word(1), vio.ndone);
+ Debug("cache_read_agg", "%x: key: %X ReadMain retrying: %d", this, first_key.word(1), (int)vio.ndone);
SET_HANDLER(&CacheVC::openReadReadDone);
VC_SCHED_WRITER_RETRY();
}
+ Debug("cache_evac", "truncation %X", key.word(0));
+ if (is_action_tag_set("cache"))
+ ink_release_assert(false);
+ Warning("Document for %X truncated", earliest_key.word(0));
+ // remove the directory entry
+ dir_delete(&earliest_key, part, &earliest_dir);
}
- Debug("cache_evac", "truncation %X", key.word(0));
-// action_tag_assert("cache", false);
- if (is_action_tag_set("cache")) {
- ink_release_assert(false);
- }
- Warning("Document for %X truncated", earliest_key.word(0));
- // remove the directory entry
- dir_delete_lock(&earliest_key, part, mutex, &earliest_dir);
+Lerror:
calluser(VC_EVENT_ERROR);
+ return EVENT_DONE;
+Lcallreturn:
+ handleEvent(AIO_EVENT_DONE, 0);
return EVENT_CONT;
}
-
int
CacheVC::openReadStartEarliest(int event, Event * e)
{
NOWARN_UNUSED(e);
NOWARN_UNUSED(event);
- int err = ECACHE_NO_DOC;
+ int ret = 0;
Doc *doc = NULL;
-
cancel_trigger();
set_io_not_in_progress();
if (_action.cancelled)
@@ -723,7 +754,6 @@
if (is_action_tag_set("cache")) {
ink_release_assert(false);
}
-
if (doc->magic == DOC_CORRUPT)
Warning("Earliest: Doc checksum does not match for %s", key.string(tmpstring));
else
@@ -739,13 +769,12 @@
}
if (!(doc->key == key))
goto Lcollision;
+ if (part->begin_read_lock(this) < 0)
+ VC_SCHED_LOCK_RETRY();
// success
earliest_key = key;
- docpos = sizeofDoc + doc->hlen;
+ docpos = doc->prefix_len();
next_CacheKey(&key, &doc->key);
- if (part->begin_read_lock(this) < 0)
- VC_SCHED_LOCK_RETRY();
-
#ifdef HIT_EVACUATE
if (part->within_hit_evacuate_window(&earliest_dir) &&
(!cache_config_hit_evacuate_size_limit || doc_len <= cache_config_hit_evacuate_size_limit)) {
@@ -756,27 +785,28 @@
#endif
if (write_vc)
CACHE_INCREMENT_DYN_STAT(cache_read_busy_success_stat);
-
SET_HANDLER(&CacheVC::openReadMain);
_action.continuation->handleEvent(CACHE_EVENT_OPEN_READ, (void *) this);
return EVENT_DONE;
-Lcollision:{
+Lcollision: {
CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
if (!lock)
VC_SCHED_LOCK_RETRY();
if (dir_probe(&key, part, &earliest_dir, &last_collision) || dir_lookaside_probe(&key, part, &earliest_dir, NULL)) {
dir = earliest_dir;
- return do_read(&key);
+ if ((ret = do_read_call(&key)) == EVENT_RETURN)
+ goto Lcallreturn;
+ return ret;
}
// read has detected that alternate does not exist in the cache.
// rewrite the vector.
#ifdef HTTP_CACHE
- if (!f.read_from_writer_called && f.http_request) {
+ if (!f.read_from_writer_called && frag_type == CACHE_FRAG_TYPE_HTTP) {
// don't want any writers while we are evacuating the vector
if (!part->open_write(this, false, 1)) {
Doc *doc1 = (Doc *) first_buf->data();
- int len = write_vector->get_handles(doc1->hdr, doc1->hlen);
+ inku32 len = write_vector->get_handles(doc1->hdr(), doc1->hlen);
ink_assert(len == doc1->hlen && write_vector->count() > 0);
write_vector->remove(alternate_index, true);
// if the vector had one alternate, delete it's directory entry
@@ -813,10 +843,12 @@
od->move_resident_alt = 1;
od->single_doc_key = doc1->key;
dir_assign(&od->single_doc_dir, &dir);
- dir_set_tag(&od->single_doc_dir, od->single_doc_key.word(1));
+ dir_set_tag(&od->single_doc_dir, od->single_doc_key.word(2));
}
SET_HANDLER(&CacheVC::openReadVecWrite);
- return do_write();
+ if ((ret = do_write_call()) == EVENT_RETURN)
+ goto Lcallreturn;
+ return ret;
}
}
}
@@ -827,8 +859,10 @@
if (od)
part->close_write_lock(this);
CACHE_INCREMENT_DYN_STAT(cache_read_failure_stat);
- _action.continuation->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *) -err);
+ _action.continuation->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *) -ECACHE_NO_DOC);
return free_CacheVC(this);
+Lcallreturn:
+ return handleEvent(AIO_EVENT_DONE, 0); // hopefully a tail call
}
// create the directory entry after the vector has been evacuated
@@ -851,7 +885,7 @@
if (io.ok()) {
ink_assert(f.evac_vector);
- ink_assert(f.http_request);
+ ink_assert(frag_type == CACHE_FRAG_TYPE_HTTP);
ink_assert(!buf.m_ptr);
f.evac_vector = false;
last_collision = NULL;
@@ -890,6 +924,9 @@
int
CacheVC::openReadStartHead(int event, Event * e)
{
+ NOWARN_UNUSED(e);
+ NOWARN_UNUSED(event);
+
int err = ECACHE_NO_DOC;
Doc *doc = NULL;
cancel_trigger();
@@ -919,7 +956,7 @@
Warning("Head: Doc checksum does not match for %s", key.string(tmpstring));
else
Warning("Head : Doc magic does not match for %s", key.string(tmpstring));
- //remove the directory entry
+ // remove the dir entry
dir_delete(&key, part, &dir);
// try going through the directory entries again
// in case the dir entry we deleted doesnt correspond
@@ -930,8 +967,6 @@
}
if (!(doc->first_key == key))
goto Lcollision;
-
-
if (f.lookup) {
CACHE_INCREMENT_DYN_STAT(cache_lookup_success_stat);
_action.continuation->handleEvent(CACHE_EVENT_LOOKUP, 0);
@@ -940,11 +975,11 @@
earliest_dir = dir;
#ifdef HTTP_CACHE
CacheHTTPInfo *alternate_tmp;
- if (f.http_request) {
+ if (frag_type == CACHE_FRAG_TYPE_HTTP) {
ink_assert(doc->hlen);
if (!doc->hlen)
goto Ldone;
- if (vector.get_handles(doc->hdr, doc->hlen) != doc->hlen) {
+ if (vector.get_handles(doc->hdr(), doc->hlen) != doc->hlen) {
if (buf) {
Note("OpenReadHead failed for cachekey %X : vector inconsistency with %d", key.word(0), doc->hlen);
dir_delete(&key, part, &dir);
@@ -977,31 +1012,31 @@
alternate.object_key_get(&key);
doc_len = alternate.object_size_get();
if (key == doc->key) { // is this my data?
- f.single_segment = doc->single_segment();
- ink_assert(f.single_segment); // otherwise need to read earliest
+ f.single_fragment = doc->single_fragment();
+ ink_assert(f.single_fragment); // otherwise need to read earliest
ink_assert(doc->hlen);
- docpos = sizeofDoc + doc->hlen;
+ docpos = doc->prefix_len();
next_CacheKey(&key, &doc->key);
} else {
- f.single_segment = false;
+ f.single_fragment = false;
}
} else
#endif
{
- // non-http docs have the total len set in the first segment
+ // non-http docs have the total len set in the first fragment
if (doc->hlen) {
ink_debug_assert(!"Cache::openReadStartHead non-http request" " for http doc");
err = -ECACHE_BAD_READ_REQUEST;
goto Ldone;
}
next_CacheKey(&key, &doc->key);
- f.single_segment = doc->single_segment();
- docpos = sizeofDoc + doc->hlen;
+ f.single_fragment = doc->single_fragment();
+ docpos = doc->prefix_len();
doc_len = doc->total_len;
}
// the first fragment might have been gc'ed. Make sure the first
// fragment is there before returning CACHE_EVENT_OPEN_READ
- if (!f.single_segment) {
+ if (!f.single_fragment) {
first_buf = buf;
buf = NULL;
earliest_key = key;
@@ -1021,7 +1056,7 @@
first_buf = buf;
if (part->begin_read_lock(this) < 0)
VC_SCHED_LOCK_RETRY();
- // success
+
SET_HANDLER(&CacheVC::openReadMain);
_action.continuation->handleEvent(CACHE_EVENT_OPEN_READ, (void *) this);
return EVENT_DONE;
@@ -1042,12 +1077,16 @@
goto Ldone;
}
od = cod;
+ MUTEX_RELEASE(lock);
SET_HANDLER(&CacheVC::openReadFromWriter);
return handleEvent(EVENT_IMMEDIATE, 0);
}
if (dir_probe(&key, part, &dir, &last_collision)) {
first_dir = dir;
- return do_read(&key);
+ int ret = do_read_call(&key);
+ if (ret == EVENT_RETURN)
+ goto Lcallreturn;
+ return ret;
}
}
Ldone:
@@ -1059,4 +1098,6 @@
_action.continuation->handleEvent(CACHE_EVENT_LOOKUP_FAILED, (void *) -err);
}
return free_CacheVC(this);
+Lcallreturn:
+ return handleEvent(AIO_EVENT_DONE, 0); // hopefully a tail call
}
Modified: incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheWrite.cc
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheWrite.cc?rev=892310&r1=892309&r2=892310&view=diff
==============================================================================
--- incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheWrite.cc (original)
+++ incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheWrite.cc Fri Dec 18 16:47:37 2009
@@ -28,13 +28,20 @@
#define HDR_PTR_SIZE (sizeof(inku64))
#define HDR_PTR_ALIGNMENT_MASK ((HDR_PTR_SIZE) - 1L)
-#define CACHE_AGGREGATE_WRITE 1
-#define MAX_AGG_LEN (256*1024)
-
-extern int cache_config_max_agg_delay;
-extern int cache_config_check_disk_idle;
extern int cache_config_agg_write_backlog;
+static inline int power_of_2(inku32 x) {
+ while (x) {
+ if (x & 1) {
+ if (x >> 1)
+ return 0;
+ return 1;
+ }
+ x >>= 1;
+ }
+ return 1;
+}
+
// Given a key, finds the index of the alternate which matches
// used to get the alternate which is actually present in the document
#ifdef HTTP_CACHE
@@ -70,79 +77,82 @@
cancel_trigger();
if (od->reading_vec || od->writing_vec)
VC_SCHED_LOCK_RETRY();
+ int ret = 0;
+ {
+ CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
+ if (!lock || od->writing_vec)
+ VC_SCHED_LOCK_RETRY();
- CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
- if (!lock || od->writing_vec)
- VC_SCHED_LOCK_RETRY();
-
- int vec = alternate.valid();
- if (f.update) {
- // all Update cases. Need to get the alternate index.
- alternate_index = get_alternate_index(write_vector, update_key);
- Debug("cache_update", "updating alternate index %d", alternate_index);
- // if its an alternate delete
- if (!vec) {
- ink_assert(!total_len);
- if (alternate_index >= 0) {
- write_vector->remove(alternate_index, true);
- alternate_index = CACHE_ALT_REMOVED;
- if (!write_vector->count())
- dir_delete(&first_key, part, &od->first_dir);
- }
- // the alternate is not there any more. somebody might have
- // deleted it. Just close this writer
- if (alternate_index != CACHE_ALT_REMOVED || !write_vector->count()) {
- SET_HANDLER(&CacheVC::openWriteCloseDir);
- return openWriteCloseDir(EVENT_IMMEDIATE, 0);
+ int vec = alternate.valid();
+ if (f.update) {
+ // all Update cases. Need to get the alternate index.
+ alternate_index = get_alternate_index(write_vector, update_key);
+ Debug("cache_update", "updating alternate index %d", alternate_index);
+ // if its an alternate delete
+ if (!vec) {
+ ink_assert(!total_len);
+ if (alternate_index >= 0) {
+ write_vector->remove(alternate_index, true);
+ alternate_index = CACHE_ALT_REMOVED;
+ if (!write_vector->count())
+ dir_delete(&first_key, part, &od->first_dir);
+ }
+ // the alternate is not there any more. somebody might have
+ // deleted it. Just close this writer
+ if (alternate_index != CACHE_ALT_REMOVED || !write_vector->count()) {
+ SET_HANDLER(&CacheVC::openWriteCloseDir);
+ return openWriteCloseDir(EVENT_IMMEDIATE, 0);
+ }
}
+ if (update_key == od->single_doc_key && (total_len || !vec))
+ od->move_resident_alt = 0;
}
- if (update_key == od->single_doc_key && (total_len || !vec))
- od->move_resident_alt = 0;
- }
- if (cache_config_http_max_alts > 1 && write_vector->count() >= cache_config_http_max_alts && alternate_index < 0) {
-
- if (od->move_resident_alt && get_alternate_index(write_vector, od->single_doc_key) == 0)
- od->move_resident_alt = 0;
+ if (cache_config_http_max_alts > 1 && write_vector->count() >= cache_config_http_max_alts && alternate_index < 0) {
+ if (od->move_resident_alt && get_alternate_index(write_vector, od->single_doc_key) == 0)
+ od->move_resident_alt = 0;
+ write_vector->remove(0, true);
+ }
+ if (vec)
+ alternate_index = write_vector->insert(&alternate, alternate_index);
- write_vector->remove(0, true);
- }
- if (vec)
- alternate_index = write_vector->insert(&alternate, alternate_index);
-
- if (od->move_resident_alt && first_buf._ptr() && !od->has_multiple_writers()) {
- Doc *doc = (Doc *) first_buf->data();
- int small_doc = doc->data_len() < cache_config_alt_rewrite_max_size;
- int have_res_alt = doc->key == od->single_doc_key;
- // if the new alternate is not written with the vector
- // then move the old one with the vector
- // if its a header only update move the resident alternate
- // with the vector.
- // We are sure that the body of the resident alternate that we are
- // rewriting has not changed and the alternate is not being deleted,
- // since we set od->move_resident_alt to 0 in that case
- // (in updateVector)
- if (small_doc && have_res_alt && (segment || (f.update && !total_len))) {
- // for multiple segment document, we must have done
- // CacheVC:openWriteCloseDataDone
- ink_assert(!segment || f.data_done);
- od->move_resident_alt = 0;
- f.rewrite_resident_alt = 1;
- write_len = doc->data_len();
- Debug("cache_update_alt",
- "rewriting resident alt size: %d key: %X, first_key: %X", write_len, doc->key.word(0), first_key.word(0));
+ if (od->move_resident_alt && first_buf._ptr() && !od->has_multiple_writers()) {
+ Doc *doc = (Doc *) first_buf->data();
+ int small_doc = (ink64)doc->data_len() < (ink64)cache_config_alt_rewrite_max_size;
+ int have_res_alt = doc->key == od->single_doc_key;
+ // if the new alternate is not written with the vector
+ // then move the old one with the vector
+ // if its a header only update move the resident alternate
+ // with the vector.
+ // We are sure that the body of the resident alternate that we are
+ // rewriting has not changed and the alternate is not being deleted,
+ // since we set od->move_resident_alt to 0 in that case
+ // (in updateVector)
+ if (small_doc && have_res_alt && (fragment || (f.update && !total_len))) {
+ // for multiple fragment document, we must have done
+ // CacheVC:openWriteCloseDataDone
+ ink_assert(!fragment || f.data_done);
+ od->move_resident_alt = 0;
+ f.rewrite_resident_alt = 1;
+ write_len = doc->data_len();
+ Debug("cache_update_alt",
+ "rewriting resident alt size: %d key: %X, first_key: %X", write_len, doc->key.word(0), first_key.word(0));
+ }
}
+ od->writing_vec = 1;
+ f.use_first_key = 1;
+ SET_HANDLER(&CacheVC::openWriteCloseHeadDone);
+ ret = do_write_call();
}
- od->writing_vec = 1;
- f.use_first_key = 1;
- SET_HANDLER(&CacheVC::openWriteCloseHeadDone);
- return do_write();
+ if (ret == EVENT_RETURN)
+ return handleEvent(AIO_EVENT_DONE, 0);
+ return ret;
}
#endif
/*
The following fields of the CacheVC are used when writing down a fragment.
Make sure that each of the fields is set to a valid value before calling
this function
- - f.http_request. Checked to see if a vector needs to be marshalled.
+ - frag_type. Checked to see if a vector needs to be marshalled.
- f.use_first_key. To decide if the vector should be marshalled and to set
the doc->key to the appropriate key (first_key or earliest_key)
- f.evac_vector. If set, the writer is pushed in the beginning of the
@@ -154,7 +164,7 @@
- f.rewrite_resident_alt. The resident alternate is rewritten.
- f.update. Used only if the write_vector needs to be written to disk.
Used to set the length of the alternate to total_len.
- - write_vector. Used only if f.http_request &&
+ - write_vector. Used only if frag_type == CACHE_FRAG_TYPE_HTTP &&
(f.use_fist_key || f.evac_vector) is set. Write_vector is written to disk
- alternate_index. Used only if write_vector needs to be written to disk.
Used to find out the VC's alternate in the write_vector and set its
@@ -182,20 +192,25 @@
// plain write case
ink_assert(!trigger);
#ifdef HTTP_CACHE
- if (f.http_request && (f.use_first_key || f.evac_vector)) {
+ if (frag_type == CACHE_FRAG_TYPE_HTTP && (f.use_first_key || f.evac_vector)) {
ink_assert(od->writing_vec);
- vec_len = write_vector->marshal_length();
- ink_assert(vec_len > 0);
+ header_len = write_vector->marshal_length();
+ ink_assert(header_len > 0);
} else
#endif
- vec_len = 0;
+ header_len = header_to_write_len;
+ if (f.use_first_key && fragment) {
+ frag_len = (fragment-1) * sizeof(Frag);
+ } else
+ frag_len = 0;
set_agg_write_in_progress();
POP_HANDLER;
- agg_len = round_to_approx_size(write_len + vec_len + sizeofDoc);
+ agg_len = round_to_approx_size(write_len + header_len + frag_len + sizeofDoc);
part->agg_todo_size += agg_len;
ink_assert(agg_len <= AGG_SIZE);
- bool agg_error = (agg_len > AGG_SIZE ||
- (!f.readers && (part->agg_todo_size > cache_config_agg_write_backlog + AGG_SIZE) && write_len));
+ bool agg_error =
+ (agg_len > AGG_SIZE ||
+ (!f.readers && (part->agg_todo_size > cache_config_agg_write_backlog + AGG_SIZE) && write_len));
#ifdef CACHE_AGG_FAIL_RATE
agg_error = agg_error || ((inku32) mutex->thread_holding->generator.random() <
(inku32) (UINT_MAX * CACHE_AGG_FAIL_RATE));
@@ -215,10 +230,8 @@
part->agg.push(this);
else
part->agg.enqueue(this);
-
- if (!part->is_io_in_progress()) {
- part->aggWrite(EVENT_IMMEDIATE, 0);
- }
+ if (!part->is_io_in_progress())
+ return part->aggWrite(event, this);
return EVENT_CONT;
}
@@ -279,7 +292,7 @@
// we can't evacuate anything between header->write_pos and
// header->write_pos + AGG_SIZE.
int ps = offset_to_part_offset(this, header->write_pos + AGG_SIZE);
- int pe = offset_to_part_offset(this, header->write_pos + 2 * EVAC_SIZE + (len / PIN_SCAN_EVERY));
+ int pe = offset_to_part_offset(this, header->write_pos + 2 * EVACUATION_SIZE + (len / PIN_SCAN_EVERY));
int part_end_offset = offset_to_part_offset(this, len + skip);
int before_end_of_part = pe < part_end_offset;
Debug("cache_evac", "scan %d %d", ps, pe);
@@ -303,63 +316,20 @@
}
}
-int
-PartCallback::aggWriteDone(int event, Event * e)
-{
- NOWARN_UNUSED(e);
- NOWARN_UNUSED(event);
-
- if (trigger) {
- trigger->cancel_action();
- trigger = NULL;
- }
-
- CacheVC *c;
- Queue<CacheVC> not_done;
- EThread *t = mutex->thread_holding;
- while ((c = (CacheVC *) write_done.dequeue())) {
- if (!c->mutex->is_thread() || c->mutex->thread_holding == t) {
- CACHE_TRY_LOCK(lock, c->mutex, t);
- if (!lock) {
- not_done.enqueue(c);
- continue;
- }
- c->handleEvent(AIO_EVENT_DONE, 0);
- } else {
- not_done.enqueue(c);
- continue;
- }
- }
- if (not_done.head) {
- write_done = not_done;
- if (write_done.head->mutex->is_thread() && write_done.head->mutex->thread_holding != t)
- trigger = write_done.head->mutex->thread_holding->schedule_imm(this);
- else
- trigger = eventProcessor.schedule_in(this, MUTEX_RETRY_DELAY);
- return EVENT_CONT;
- }
- return EVENT_DONE;
-}
-
/* NOTE:: This state can be called by an AIO thread, so DON'T DON'T
DON'T schedule any events on this thread using VC_SCHED_XXX or
mutex->thread_holding->schedule_xxx_local(). ALWAYS use
eventProcessor.schedule_xxx().
*/
int
-Part::aggWriteDone(int event, Event * e)
+Part::aggWriteDone(int event, Event *e)
{
NOWARN_UNUSED(e);
NOWARN_UNUSED(event);
cancel_trigger();
- ProxyMutex *dir_sync_lock;
- if (dir_sync_waiting)
- dir_sync_lock = cacheDirSync->mutex;
- else
- dir_sync_lock = mutex;
- CACHE_TRY_LOCK(lock, dir_sync_lock, mutex->thread_holding);
+ CACHE_TRY_LOCK(lock, dir_sync_waiting ? cacheDirSync->mutex : mutex, mutex->thread_holding);
if (!lock) {
// INKqa10347
// Race condition between cacheDirSync and the part when setting the
@@ -367,22 +337,25 @@
eventProcessor.schedule_in(this, MUTEX_RETRY_DELAY);
return EVENT_CONT;
}
-
if (io.ok()) {
header->last_write_pos = header->write_pos;
header->write_pos += io.aiocb.aio_nbytes;
- Debug("cache_agg", "Dir %s, Write: %llu, last Write: %llu\n", hash_id, header->write_pos, header->last_write_pos);
+ ink_assert(header->write_pos >= start);
+ Debug("cache_agg", "Dir %s, Write: %llu, last Write: %llu\n",
+ hash_id, header->write_pos, header->last_write_pos);
ink_assert(header->write_pos == header->agg_pos);
- if (header->write_pos + EVAC_SIZE > scan_pos)
+ if (header->write_pos + EVACUATION_SIZE > scan_pos)
periodic_scan();
-
agg_buf_pos = 0;
header->write_serial++;
} else {
// delete all the directory entries that we inserted
// for fragments is this aggregation buffer
Debug("cache_disk_error", "Write error on disk %s\n \
- write range : [%llu - %llu bytes] [%llu - %llu blocks] \n", hash_id, io.aiocb.aio_offset, io.aiocb.aio_offset + io.aiocb.aio_nbytes, io.aiocb.aio_offset / 512, (io.aiocb.aio_offset + io.aiocb.aio_nbytes) / 512);
+ write range : [%llu - %llu bytes] [%llu - %llu blocks] \n",
+ hash_id, io.aiocb.aio_offset, io.aiocb.aio_offset + io.aiocb.aio_nbytes,
+ io.aiocb.aio_offset / INK_BLOCK_SIZE,
+ (io.aiocb.aio_offset + io.aiocb.aio_nbytes) / INK_BLOCK_SIZE);
Dir del_dir;
dir_clear(&del_dir);
for (int done = 0; done < agg_buf_pos;) {
@@ -393,31 +366,25 @@
}
agg_buf_pos = 0;
}
-
set_io_not_in_progress();
SET_HANDLER(&Part::aggWrite);
-
if (dir_sync_waiting) {
dir_sync_waiting = 0;
cacheDirSync->handleEvent(EVENT_IMMEDIATE, 0);
}
-
if (agg.head)
- return aggWrite(EVENT_IMMEDIATE, e);
+ return aggWrite(event, e);
return EVENT_CONT;
}
-extern volatile int cachewrite_buf_data;
-
CacheVC *
new_DocEvacuator(int nbytes, Part * part)
{
CacheVC *c = new_CacheVC(part);
ProxyMutex *mutex = part->mutex;
c->base_stat = cache_evacuate_active_stat;
-
CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
- c->buf = new_IOBufferData(iobuffer_size_to_index(nbytes), MEMALIGNED);
+ c->buf = new_IOBufferData(iobuffer_size_to_index(nbytes, MAX_BUFFER_SIZE_INDEX), MEMALIGNED);
c->part = part;
c->f.evacuator = 1;
c->earliest_key = zero_key;
@@ -449,9 +416,9 @@
goto Lcollision;
#ifdef HTTP_CACHE
alternate_tmp = 0;
- if (doc->hlen) {
- //its an http document
- if (vector.get_handles(doc->hdr, doc->hlen) != doc->hlen) {
+ if (doc->ftype == CACHE_FRAG_TYPE_HTTP && doc->hlen) {
+ // its an http document
+ if (vector.get_handles(doc->hdr(), doc->hlen) != doc->hlen) {
Note("bad vector detected during evacuation");
goto Ldone;
}
@@ -481,11 +448,13 @@
return free_CacheVC(this);
}
return EVENT_CONT;
-
Lcollision:
- if (dir_probe(&first_key, part, &dir, &last_collision))
- return do_read(&first_key);
-
+ if (dir_probe(&first_key, part, &dir, &last_collision)) {
+ int ret = do_read_call(&first_key);
+ if (ret == EVENT_RETURN)
+ return handleEvent(AIO_EVENT_DONE, 0);
+ return ret;
+ }
Ldone:
dir_lookaside_remove(&earliest_key, part);
return free_CacheVC(this);
@@ -508,10 +477,10 @@
for (; b; b = b->link.next) {
if (dir_offset(&b->dir) == dir_offset(&overwrite_dir)) {
- // If the document is single segment (although not tied to the vector),
+ // If the document is single fragment (although not tied to the vector),
// then we don't have to put the directory entry in the lookaside
// buffer. But, we have no way of finding out if the document is
- // single segment. doc->single_segment() can be true for a multiple
+ // single fragment. doc->single_fragment() can be true for a multiple
// fragment document since total_len and doc->len could be equal at
// the time we write the fragment down. To be on the safe side, we
// only overwrite the entry in the directory if its not a head.
@@ -575,7 +544,10 @@
dir_lookaside_insert(b, part, &earliest_dir);
// read the vector
SET_HANDLER(&CacheVC::evacuateReadHead);
- return do_read(&first_key);
+ int ret = do_read_call(&first_key);
+ if (ret == EVENT_RETURN)
+ return handleEvent(AIO_EVENT_DONE, 0);
+ return ret;
}
}
}
@@ -586,7 +558,7 @@
}
int
-evacuate_segments(CacheKey * key, CacheKey * earliest_key, int force, Part * part)
+evacuate_fragments(CacheKey * key, CacheKey * earliest_key, int force, Part * part)
{
Dir dir, *last_collision = 0;
int i = 0;
@@ -615,7 +587,7 @@
if (force)
b->f.readers = 0;
Debug("cache_evac",
- "next segment %X Earliest: %X offset %d phase %d force %d",
+ "next fragment %X Earliest: %X offset %d phase %d force %d",
(int) key->word(0), (int) earliest_key->word(0), (int) dir_offset(&dir), (int) dir_phase(&dir), force);
}
return i;
@@ -639,7 +611,6 @@
ink_assert(evacuator->agg_len <= AGG_SIZE);
agg.insert(evacuator, after);
return aggWrite(event, e);
-
}
int
@@ -648,9 +619,8 @@
NOWARN_UNUSED(e);
cancel_trigger();
- if (event != AIO_EVENT_DONE) {
+ if (event != AIO_EVENT_DONE)
return EVENT_DONE;
- }
ink_assert(is_io_in_progress());
set_io_not_in_progress();
ink_debug_assert(mutex->thread_holding == this_ethread());
@@ -717,11 +687,12 @@
// Cache::open_write).
if (!dir_head(&b->dir) || !dir_compare_tag(&b->dir, &doc->first_key)) {
next_CacheKey(&next_key, &doc->key);
- evacuate_segments(&next_key, &doc_evacuator->earliest_key, !b->f.readers, this);
+ evacuate_fragments(&next_key, &doc_evacuator->earliest_key, !b->f.readers, this);
}
return evacuateWrite(doc_evacuator, event, e);
Ldone:
free_CacheVC(doc_evacuator);
+ doc_evacuator = 0;
return aggWrite(event, e);
}
@@ -733,7 +704,6 @@
int si = dir_offset_evac_bucket(s);
int ei = dir_offset_evac_bucket(e);
- // Debug("cache_evac", "evac_range %d %d %d %d", si, ei, s, e);
for (int i = si; i <= ei; i++) {
EvacuationBlock *b = evacuate[i].head;
EvacuationBlock *first = 0;
@@ -741,8 +711,6 @@
for (; b; b = b->link.next) {
int offset = dir_offset(&b->dir);
int phase = dir_phase(&b->dir);
- // Debug("cache_evac", "evac_range test %X %d %d",
- // b->key.word(0), offset, b->f.done);
if (offset >= s && offset < e && !b->f.done && phase == evac_phase)
if (offset < first_offset) {
first = b;
@@ -782,9 +750,9 @@
Doc *doc = (Doc *) p;
IOBufferBlock *res_alt_blk = 0;
- int seglen = vc->write_len + vc->vec_len + sizeofDoc;
- ink_assert(!vc->f.http_request || seglen != sizeofDoc);
- int writelen = round_to_approx_size(seglen);
+ inku32 len = vc->write_len + vc->header_len + vc->frag_len + sizeofDoc;
+ ink_assert(vc->frag_type != CACHE_FRAG_TYPE_HTTP || len != sizeofDoc);
+ int writelen = round_to_approx_size(len);
// update copy of directory entry for this document
dir_set_approx_size(&vc->dir, writelen);
dir_set_offset(&vc->dir, offset_to_part_offset(part, o));
@@ -793,8 +761,10 @@
// fill in document header
doc->magic = DOC_MAGIC;
- doc->len = seglen;
- doc->hlen = vc->vec_len;
+ doc->len = len;
+ doc->hlen = vc->header_len;
+ doc->ftype = vc->frag_type;
+ doc->flen = vc->frag_len;
doc->total_len = vc->total_len;
doc->first_key = vc->first_key;
doc->sync_serial = part->header->sync_serial;
@@ -818,7 +788,7 @@
dir_set_head(&vc->dir, true);
} else {
doc->key = vc->key;
- dir_set_head(&vc->dir, !vc->segment);
+ dir_set_head(&vc->dir, !vc->fragment);
}
#ifdef HTTP_CACHE
@@ -831,28 +801,31 @@
}
#endif
// update the new_info object_key, and total_len and dirinfo
-#ifdef HTTP_CACHE
- if (vc->vec_len) {
+ if (vc->header_len) {
ink_debug_assert(vc->f.use_first_key);
- ink_debug_assert(vc->write_vector->count() > 0);
- if (!vc->f.update && !vc->f.evac_vector) {
- ink_debug_assert(!(vc->first_key == zero_key));
- CacheHTTPInfo *http_info = vc->write_vector->get(vc->alternate_index);
- http_info->object_size_set(vc->total_len);
- }
- // update + data_written => Update case (b)
- // need to change the old alternate's object length
- if (vc->f.update && vc->total_len) {
- CacheHTTPInfo *http_info = vc->write_vector->get(vc->alternate_index);
- http_info->object_size_set(vc->total_len);
- }
- ink_assert(!(((unsigned long) &doc->hdr[0]) & HDR_PTR_ALIGNMENT_MASK));
- ink_assert(vc->vec_len == vc->write_vector->marshal(&doc->hdr[0], vc->vec_len));
- // the single segment flag is not used in the write call.
+#ifdef HTTP_CACHE
+ if (vc->frag_type == CACHE_FRAG_TYPE_HTTP) {
+ ink_debug_assert(vc->write_vector->count() > 0);
+ if (!vc->f.update && !vc->f.evac_vector) {
+ ink_debug_assert(!(vc->first_key == zero_key));
+ CacheHTTPInfo *http_info = vc->write_vector->get(vc->alternate_index);
+ http_info->object_size_set(vc->total_len);
+ }
+ // update + data_written => Update case (b)
+ // need to change the old alternate's object length
+ if (vc->f.update && vc->total_len) {
+ CacheHTTPInfo *http_info = vc->write_vector->get(vc->alternate_index);
+ http_info->object_size_set(vc->total_len);
+ }
+ ink_assert(!(((unsigned long) &doc->hdr()[0]) & HDR_PTR_ALIGNMENT_MASK));
+ ink_assert(vc->header_len == vc->write_vector->marshal(doc->hdr(), vc->header_len));
+ } else
+#endif
+ memcpy(doc->hdr(), vc->header_to_write, vc->header_len);
+ // the single fragment flag is not used in the write call.
// putting it in for completeness.
- vc->f.single_segment = doc->single_segment();
+ vc->f.single_fragment = doc->single_fragment();
}
-#endif
// move data
if (vc->write_len) {
{
@@ -862,31 +835,29 @@
}
#ifdef HTTP_CACHE
if (vc->f.rewrite_resident_alt)
- iobufferblock_memcpy(&doc->hdr[vc->vec_len], vc->write_len, res_alt_blk, 0);
+ iobufferblock_memcpy(doc->data(), vc->write_len, res_alt_blk, 0);
else
#endif
- iobufferblock_memcpy(&doc->hdr[vc->vec_len], vc->write_len, vc->blocks, vc->offset);
+ iobufferblock_memcpy(doc->data(), vc->write_len, vc->blocks, vc->offset);
#ifdef VERIFY_JTEST_DATA
- if (f.use_first_key && vec_len) {
+ if (f.use_first_key && header_len) {
int ib = 0, xd = 0;
char xx[500];
new_info.request_get().url_get().print(xx, 500, &ib, &xd);
char *x = xx;
for (int q = 0; q < 3; q++)
x = strchr(x + 1, '/');
- ink_assert(!memcmp(&doc->hdr[vec_len], x, ib - (x - xx)));
+ ink_assert(!memcmp(doc->hdr(), x, ib - (x - xx)));
}
#endif
}
if (cache_config_enable_checksum) {
doc->checksum = 0;
- for (char *b = doc->hdr; b < (char *) doc + doc->len; b++) {
+ for (char *b = doc->hdr(); b < (char *) doc + doc->len; b++)
doc->checksum += *b;
- }
}
-
- if (vc->f.http_request && vc->f.single_segment)
+ if (vc->frag_type == CACHE_FRAG_TYPE_HTTP && vc->f.single_fragment)
ink_assert(doc->hlen);
if (res_alt_blk)
@@ -994,86 +965,76 @@
the eventProcessor to schedule events
*/
int
-Part::aggWrite(int event, Event * e)
+Part::aggWrite(int event, void *e)
{
NOWARN_UNUSED(e);
NOWARN_UNUSED(event);
+ Que(CacheVC, link) tocall;
+ CacheVC *c;
+
cancel_trigger();
ink_assert(!is_io_in_progress());
Lagain:
// calculate length of aggregated write
- CacheVC * c;
-
for (c = (CacheVC *) agg.head; c;) {
int writelen = c->agg_len;
ink_assert(writelen < AGG_SIZE);
- if (agg_buf_pos + writelen > AGG_SIZE || header->write_pos + agg_buf_pos + writelen > (skip + len))
+ if (agg_buf_pos + writelen > AGG_SIZE ||
+ header->write_pos + agg_buf_pos + writelen > (skip + len))
break;
- Debug("agg_read", "copying: %d, %llu, key: %d", agg_buf_pos, header->write_pos + agg_buf_pos, c->first_key.word(0));
- agg_copy(agg_buffer + agg_buf_pos, c);
+ Debug("agg_read", "copying: %d, %llu, key: %d",
+ agg_buf_pos, header->write_pos + agg_buf_pos, c->first_key.word(0));
+ int wrotelen = agg_copy(agg_buffer + agg_buf_pos, c);
+ ink_assert(writelen == wrotelen);
agg_todo_size -= writelen;
agg_buf_pos += writelen;
- CacheVC *n = (CacheVC *) c->link.next;
+ CacheVC *n = (CacheVC *)c->link.next;
agg.dequeue();
if (c->f.evacuator)
c->handleEvent(AIO_EVENT_DONE, 0);
else
- callback_cont->write_done.enqueue(c);
+ tocall.enqueue(c);
c = n;
}
// if we got nothing...
if (!agg_buf_pos) {
- if (!agg.head) {
- // nothing to get
- return EVENT_DONE;
- }
+ if (!agg.head) // nothing to get
+ return EVENT_CONT;
if (header->write_pos == start) {
// write aggregation too long, bad bad, punt on everything.
- //action_tag_assert("cache", false);
Note("write aggregation exceeds part size");
- if (is_action_tag_set("cache")) {
- ink_release_assert(false);
- }
- CacheVC *vc;
- while ((vc = agg.dequeue())) {
- agg_todo_size -= vc->agg_len;
- // signal failure?
- callback_cont->write_done.enqueue(vc);
- if (!callback_cont->trigger)
- callback_cont->trigger = eventProcessor.schedule_imm(callback_cont);
+ ink_assert(!tocall.head);
+ ink_assert(false);
+ while ((c = agg.dequeue())) {
+ agg_todo_size -= c->agg_len;
+ c->initial_thread->schedule_imm(c, AIO_EVENT_DONE);
}
- return EVENT_DONE;
+ return EVENT_CONT;
}
// start back
agg_wrap();
goto Lagain;
}
- if (!callback_cont->trigger) {
- if (callback_cont->write_done.head)
- callback_cont->trigger = eventProcessor.schedule_imm(callback_cont);
- }
+ // set write limit
+ header->agg_pos = header->write_pos + agg_buf_pos;
+
// evacuate space
- ink_off_t end = header->write_pos + agg_buf_pos + EVAC_SIZE;
+ ink_off_t end = header->write_pos + agg_buf_pos + EVACUATION_SIZE;
if (evac_range(header->write_pos, end, !header->phase) < 0)
- return EVENT_CONT;
+ goto Lwait;
if (end > skip + len)
if (evac_range(start, start + (end - (skip + len)), header->phase))
- return EVENT_CONT;
+ goto Lwait;
// if agg.head, then we are near the end of the disk, so
// write down the aggregation in whatever size it is.
- if (agg_buf_pos < MAX_AGG_LEN && !agg.head && !dir_sync_waiting)
- return EVENT_CONT;
- Debug("agg_read", "flushing: %d", agg_buf_pos);
-
- // set write limit
- header->agg_pos = header->write_pos + agg_buf_pos;
+ if (agg_buf_pos < AGG_HIGH_WATER && !agg.head && !dir_sync_waiting)
+ goto Lwait;
- // do write
io.aiocb.aio_fildes = fd;
io.aiocb.aio_offset = header->write_pos;
io.aiocb.aio_buf = agg_buffer;
@@ -1082,7 +1043,16 @@
io.thread = mutex->thread_holding;
SET_HANDLER(&Part::aggWriteDone);
ink_aio_write(&io);
- return EVENT_CONT;
+
+Lwait:
+ int ret = EVENT_CONT;
+ while ((c = tocall.dequeue())) {
+ if (event == EVENT_CALL && c->mutex->thread_holding == mutex->thread_holding)
+ ret = EVENT_RETURN;
+ else
+ c->initial_thread->schedule_imm(c, AIO_EVENT_DONE);
+ }
+ return ret;
}
int
@@ -1100,21 +1070,21 @@
VC_SCHED_LOCK_RETRY();
}
part->close_write(this);
- if (closed < 0 && segment)
+ if (closed < 0 && fragment)
dir_delete(&earliest_key, part, &earliest_dir);
}
if (is_debug_tag_set("cache_update")) {
if (f.update && closed > 0) {
if (!total_len && alternate_index != CACHE_ALT_REMOVED) {
Debug("cache_update", "header only %d (%llu, %llu)\n",
- DIR_MASK_TAG(first_key.word(1)), update_key.b[0], update_key.b[1]);
+ DIR_MASK_TAG(first_key.word(2)), update_key.b[0], update_key.b[1]);
} else if (total_len && alternate_index != CACHE_ALT_REMOVED) {
Debug("cache_update", "header body, %d, (%llu, %llu), (%llu, %llu)\n",
- DIR_MASK_TAG(first_key.word(1)), update_key.b[0], update_key.b[1], earliest_key.b[0], earliest_key.b[1]);
+ DIR_MASK_TAG(first_key.word(2)), update_key.b[0], update_key.b[1], earliest_key.b[0], earliest_key.b[1]);
} else if (!total_len && alternate_index == CACHE_ALT_REMOVED) {
Debug("cache_update", "alt delete, %d, (%llu, %llu)\n",
- DIR_MASK_TAG(first_key.word(1)), update_key.b[0], update_key.b[1]);
+ DIR_MASK_TAG(first_key.word(2)), update_key.b[0], update_key.b[1]);
}
}
}
@@ -1123,29 +1093,14 @@
// one, two and three or more fragments. This is because for
// updates we dont decrement the variable corresponding the old
// size of the document
- /*
- Debug("segment_size","Segment = %d\n",segment);
- Debug("segment_size","total_len = %d\n",total_len);
- Debug("segment_size","closed = %d\n",closed);
- Debug("segment_size","f.update = %d\n",f.update);
- */
if ((closed == 1) && (total_len > 0)) {
- Debug("cache_stats", "Segment = %d", segment);
- switch (segment) {
- case 0:
- CACHE_INCREMENT_DYN_STAT(cache_single_fragment_document_count_stat);
- break;
-
- case 1:
- CACHE_INCREMENT_DYN_STAT(cache_two_fragment_document_count_stat);
- break;
-
- default:
- CACHE_INCREMENT_DYN_STAT(cache_three_plus_plus_fragment_document_count_stat);
- break;
+ Debug("cache_stats", "Fragment = %d", fragment);
+ switch (fragment) {
+ case 0: CACHE_INCREMENT_DYN_STAT(cache_single_fragment_document_count_stat); break;
+ case 1: CACHE_INCREMENT_DYN_STAT(cache_two_fragment_document_count_stat); break;
+ default: CACHE_INCREMENT_DYN_STAT(cache_three_plus_plus_fragment_document_count_stat); break;
}
}
-
return free_CacheVC(this);
}
@@ -1153,43 +1108,47 @@
CacheVC::openWriteCloseHeadDone(int event, Event * e)
{
NOWARN_UNUSED(e);
- ink_assert(event == AIO_EVENT_DONE);
- set_io_not_in_progress();
- od->writing_vec = 0;
-
- if (!io.ok())
- return openWriteCloseDir(event, e);
- ink_assert(f.use_first_key);
- // lock taken by caller
- ink_debug_assert(part->mutex->thread_holding == this_ethread());
- if (!od->dont_update_directory) {
- if (dir_is_empty(&od->first_dir)) {
- dir_insert(&first_key, part, &dir);
- } else {
- // multiple segment vector write
- dir_overwrite(&first_key, part, &dir, &od->first_dir, false);
- // insert moved resident alternate
- if (od->move_resident_alt) {
- if (dir_valid(part, &od->single_doc_dir))
- dir_insert(&od->single_doc_key, part, &od->single_doc_dir);
- od->move_resident_alt = 0;
+ {
+ CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
+ if (!lock)
+ VC_LOCK_RETRY_EVENT();
+ if (event == AIO_EVENT_DONE)
+ set_io_not_in_progress();
+ else if (is_io_in_progress())
+ return EVENT_CONT;
+ od->writing_vec = 0;
+ if (!io.ok())
+ goto Lclose;
+ ink_assert(f.use_first_key);
+ if (!od->dont_update_directory) {
+ if (dir_is_empty(&od->first_dir)) {
+ dir_insert(&first_key, part, &dir);
+ } else {
+ // multiple fragment vector write
+ dir_overwrite(&first_key, part, &dir, &od->first_dir, false);
+ // insert moved resident alternate
+ if (od->move_resident_alt) {
+ if (dir_valid(part, &od->single_doc_dir))
+ dir_insert(&od->single_doc_key, part, &od->single_doc_dir);
+ od->move_resident_alt = 0;
+ }
}
- }
- od->first_dir = dir;
- if (f.http_request && f.single_segment) {
- // segment is tied to the vector
- od->move_resident_alt = 1;
- if (!f.rewrite_resident_alt) {
- od->single_doc_key = earliest_key;
+ od->first_dir = dir;
+ if (frag_type == CACHE_FRAG_TYPE_HTTP && f.single_fragment) {
+ // fragment is tied to the vector
+ od->move_resident_alt = 1;
+ if (!f.rewrite_resident_alt) {
+ od->single_doc_key = earliest_key;
+ }
+ dir_assign(&od->single_doc_dir, &dir);
+ dir_set_tag(&od->single_doc_dir, od->single_doc_key.word(2));
}
- dir_assign(&od->single_doc_dir, &dir);
- dir_set_tag(&od->single_doc_dir, od->single_doc_key.word(1));
}
}
+Lclose:
return openWriteCloseDir(event, e);
}
-
int
CacheVC::openWriteCloseHead(int event, Event * e)
{
@@ -1198,7 +1157,7 @@
cancel_trigger();
f.use_first_key = 1;
if (io.ok())
- ink_assert(segment || (length == total_len));
+ ink_assert(fragment || (length == total_len));
else
return openWriteCloseDir(event, e);
if (f.data_done)
@@ -1206,7 +1165,7 @@
else
write_len = length;
#ifdef HTTP_CACHE
- if (f.http_request) {
+ if (frag_type == CACHE_FRAG_TYPE_HTTP) {
SET_HANDLER(&CacheVC::updateVector);
return updateVector(EVENT_IMMEDIATE, 0);
} else {
@@ -1214,7 +1173,6 @@
SET_HANDLER(&CacheVC::openWriteCloseHeadDone);
return do_write_lock();
#ifdef HTTP_CACHE
-
}
#endif
}
@@ -1224,26 +1182,29 @@
{
NOWARN_UNUSED(e);
- if (event == AIO_EVENT_DONE) {
+ if (event == AIO_EVENT_DONE)
set_io_not_in_progress();
-
- if (!io.ok())
- return openWriteCloseDir(event, e);
+ else if (is_io_in_progress())
+ return EVENT_CONT;
+ if (!io.ok())
+ return openWriteCloseDir(event, e);
+ {
+ CACHE_TRY_LOCK(lock, part->mutex, this_ethread());
+ if (!lock)
+ VC_LOCK_RETRY_EVENT();
+ fragment++;
+ write_pos += write_len;
+ dir_insert(&key, part, &dir);
+ f.data_done = 1;
+ ink_assert(fragment);
+ return openWriteCloseHead(event, e); // must be called under part lock from here
}
- // locked taken by caller
- ink_debug_assert(part->mutex->thread_holding == this_ethread());
- dir_insert(&key, part, &dir);
-
- f.data_done = 1;
- ink_assert(segment);
- return openWriteCloseHead(event, e);
}
int
CacheVC::openWriteClose(int event, Event * e)
{
NOWARN_UNUSED(e);
-
cancel_trigger();
if (is_io_in_progress()) {
if (event != AIO_EVENT_DONE)
@@ -1265,18 +1226,16 @@
}
#else
return openWriteCloseDir(event, e);
-#endif //HTTP_CACHE
-
+#endif
}
- if (length && segment) {
+ if (length && fragment) {
SET_HANDLER(&CacheVC::openWriteCloseDataDone);
write_len = length;
- return do_write_lock();
+ return do_write_lock_call();
} else
return openWriteCloseHead(event, e);
- } else {
+ } else
return openWriteCloseDir(event, e);
- }
}
int
@@ -1285,11 +1244,11 @@
NOWARN_UNUSED(e);
cancel_trigger();
- ink_assert(is_io_in_progress());
- if (event != AIO_EVENT_DONE)
- return EVENT_CONT;
-
- set_io_not_in_progress();
+ if (event == AIO_EVENT_DONE)
+ set_io_not_in_progress();
+ else
+ if (is_io_in_progress())
+ return EVENT_CONT;
// In the event of VC_EVENT_ERROR, the cont must do an io_close
if (!io.ok()) {
if (closed) {
@@ -1300,18 +1259,36 @@
calluser(VC_EVENT_ERROR);
return EVENT_CONT;
}
- // store the earliest directory. Need to remove the earliest dir
- // in case the writer aborts.
- if (!segment) {
- ink_assert(key == earliest_key);
- earliest_dir = dir;
- }
- segment++;
- dir_insert(&key, part, &dir);
- Debug("cache_insert", "WriteDone: %X, %X, %d", key.word(0), first_key.word(0), write_len);
- blocks = iobufferblock_skip(blocks, &offset, &length, write_len);
- write_len = length;
- next_CacheKey(&key, &key);
+ {
+ CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
+ if (!lock)
+ VC_LOCK_RETRY_EVENT();
+ // store the earliest directory. Need to remove the earliest dir
+ // in case the writer aborts.
+ if (!fragment) {
+ ink_assert(key == earliest_key);
+ earliest_dir = dir;
+ } else {
+ if (!frag)
+ frag = &integral_frags[0];
+ else {
+ if (fragment-1 >= INTEGRAL_FRAGS && power_of_2((inku32)(fragment-1))) {
+ Frag *t = frag;
+ frag = (Frag*)xmalloc(sizeof(Frag) * (fragment-1)*2);
+ memcpy(frag, t, sizeof(Frag) * (fragment-2));
+ if (t != integral_frags)
+ xfree(t);
+ }
+ }
+ frag[fragment-1].offset = write_pos;
+ }
+ fragment++;
+ write_pos += write_len;
+ dir_insert(&key, part, &dir);
+ Debug("cache_insert", "WriteDone: %X, %X, %d", key.word(0), first_key.word(0), write_len);
+ blocks = iobufferblock_skip(blocks, &offset, (int*)&length, write_len);
+ next_CacheKey(&key, &key);
+ }
if (closed)
return die();
SET_HANDLER(&CacheVC::openWriteMain);
@@ -1340,10 +1317,10 @@
if (vio.ntodo() <= 0)
return EVENT_CONT;
}
- ink64 ntodo = (ink64) vio.ntodo() + length;
- int total_avail = vio.buffer.reader()->read_avail();
- int avail = total_avail;
- int towrite = avail + length;
+ ink64 ntodo = (ink64)(vio.ntodo() + length);
+ ink64 total_avail = vio.buffer.reader()->read_avail();
+ ink64 avail = total_avail;
+ ink64 towrite = avail + length;
if (towrite > ntodo) {
avail -= (towrite - ntodo);
towrite = ntodo;
@@ -1361,7 +1338,7 @@
vio.ndone += avail;
total_len += avail;
}
- length = towrite;
+ length = (inku64)towrite;
if (length > TARGET_FRAG_SIZE && length < SHRINK_TARGET_FRAG_SIZE)
write_len = TARGET_FRAG_SIZE;
else
@@ -1380,7 +1357,7 @@
return EVENT_CONT;
SET_HANDLER(&CacheVC::openWriteWriteDone);
- return do_write_lock();
+ return do_write_lock_call();
}
// begin overwrite
@@ -1388,7 +1365,6 @@
CacheVC::openWriteOverwrite(int event, Event * e)
{
NOWARN_UNUSED(e);
- int res;
cancel_trigger();
if (event != AIO_EVENT_DONE) {
@@ -1409,20 +1385,24 @@
od->first_dir = dir;
goto Ldone;
}
-
Lcollision:
{
CACHE_TRY_LOCK(lock, part->mutex, this_ethread());
if (!lock)
- VC_SCHED_LOCK_RETRY();
- res = dir_probe(&first_key, part, &dir, &last_collision);
- if (res > 0)
- return do_read(&first_key);
+ VC_LOCK_RETRY_EVENT();
+ int res = dir_probe(&first_key, part, &dir, &last_collision);
+ if (res > 0) {
+ if ((res = do_read_call(&first_key)) == EVENT_RETURN)
+ goto Lcallreturn;
+ return res;
+ }
}
Ldone:
SET_HANDLER(&CacheVC::openWriteMain);
_action.continuation->handleEvent(CACHE_EVENT_OPEN_WRITE, (void *) this);
return EVENT_DONE;
+Lcallreturn:
+ return handleEvent(AIO_EVENT_DONE, 0); // hopefully a tail call
}
#ifdef HTTP_CACHE
@@ -1458,7 +1438,7 @@
if (!dir_valid(part, &dir)) {
Debug("cache_write",
"OpenReadStartDone: Dir not valid: Write Head: %d, Dir: %d",
- offset_to_part_offset(part, part->header->write_pos), dir.offset);
+ offset_to_part_offset(part, part->header->write_pos), dir_offset(&dir));
last_collision = NULL;
goto Lcollision;
}
@@ -1472,21 +1452,21 @@
err = ECACHE_BAD_META_DATA;
goto Lfailure;
}
- ink_assert((((unsigned long) &doc->hdr[0]) & HDR_PTR_ALIGNMENT_MASK) == 0);
+ ink_assert((((unsigned long) &doc->hdr()[0]) & HDR_PTR_ALIGNMENT_MASK) == 0);
- if (write_vector->get_handles(doc->hdr, doc->hlen, buf) != doc->hlen) {
+ if (write_vector->get_handles(doc->hdr(), doc->hlen, buf) != doc->hlen) {
err = ECACHE_BAD_META_DATA;
goto Lfailure;
}
ink_debug_assert(write_vector->count() > 0);
od->first_dir = dir;
first_dir = dir;
- if (doc->single_segment()) {
- // segment is tied to the vector
+ if (doc->single_fragment()) {
+ // fragment is tied to the vector
od->move_resident_alt = 1;
od->single_doc_key = doc->key;
dir_assign(&od->single_doc_dir, &dir);
- dir_set_tag(&od->single_doc_dir, DIR_MASK_TAG(od->single_doc_key.word(1)));
+ dir_set_tag(&od->single_doc_dir, od->single_doc_key.word(2));
}
first_buf = buf;
goto Lsuccess;
@@ -1497,12 +1477,13 @@
int if_writers = ((uintptr_t) info == CACHE_ALLOW_MULTIPLE_WRITES);
CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
if (!lock)
- VC_SCHED_LOCK_RETRY();
+ VC_LOCK_RETRY_EVENT();
if (!od) {
- if ((err = part->open_write(this, if_writers,
- cache_config_http_max_alts > 1 ? cache_config_http_max_alts : 0)) > 0)
+ if ((err = part->open_write(
+ this, if_writers, cache_config_http_max_alts > 1 ? cache_config_http_max_alts : 0)) > 0)
goto Lfailure;
if (od->has_multiple_writers()) {
+ MUTEX_RELEASE(lock);
SET_HANDLER(&CacheVC::openWriteMain);
_action.continuation->handleEvent(CACHE_EVENT_OPEN_WRITE, (void *) this);
return EVENT_DONE;
@@ -1511,13 +1492,15 @@
// check for collision
if (dir_probe(&first_key, part, &dir, &last_collision)) {
od->reading_vec = 1;
- return do_read(&first_key);
+ int ret = do_read_call(&first_key);
+ if (ret == EVENT_RETURN)
+ goto Lcallreturn;
+ return ret;
}
if (f.update) {
// fail update because vector has been GC'd
goto Lfailure;
}
-
}
Lsuccess:
od->reading_vec = 0;
@@ -1538,6 +1521,8 @@
return EVENT_DONE;
} else
return free_CacheVC(this);
+Lcallreturn:
+ return handleEvent(AIO_EVENT_DONE, 0); // hopefully a tail call
}
#endif
// handle lock failures from main Cache::open_write entry points below
@@ -1556,13 +1541,16 @@
VC_SCHED_LOCK_RETRY();
if ((err = part->open_write(this, false, 1)) > 0) {
CACHE_INCREMENT_DYN_STAT(base_stat + CACHE_STAT_FAILURE);
+ free_CacheVC(this);
+ MUTEX_RELEASE(lock);
_action.continuation->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *) -err);
- return free_CacheVC(this);
+ return 0;
}
if (f.overwrite) {
SET_HANDLER(&CacheVC::openWriteOverwrite);
return openWriteOverwrite(EVENT_IMMEDIATE, 0);
} else {
+ MUTEX_RELEASE(lock);
// write by key
SET_HANDLER(&CacheVC::openWriteMain);
_action.continuation->handleEvent(CACHE_EVENT_OPEN_WRITE, (void *) this);
@@ -1592,8 +1580,9 @@
Part *part = c->part;
CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
c->first_key = c->key = *key;
+ c->frag_type = frag_type;
/*
- The transition from single segment document to a multi-segment document
+ The transition from single fragment document to a multi-fragment document
would cause a problem if the key and the first_key collide. In case of
a collision, old vector data could be served to HTTP. Need to avoid that.
Also, when evacuating a fragment, we have to decide if its the first_key
@@ -1601,8 +1590,7 @@
*/
do {
rand_CacheKey(&c->key, cont->mutex);
- }
- while (DIR_MASK_TAG(c->key.word(1)) == DIR_MASK_TAG(c->first_key.word(1)));
+ } while (DIR_MASK_TAG(c->key.word(2)) == DIR_MASK_TAG(c->first_key.word(2)));
c->earliest_key = c->key;
#ifdef HTTP_CACHE
c->info = 0;
@@ -1656,7 +1644,7 @@
c->vio.op = VIO::WRITE;
c->first_key = *key;
/*
- The transition from single segment document to a multi-segment document
+ The transition from single fragment document to a multi-fragment document
would cause a problem if the key and the first_key collide. In case of
a collision, old vector data could be served to HTTP. Need to avoid that.
Also, when evacuating a fragment, we have to decide if its the first_key
@@ -1665,9 +1653,9 @@
do {
rand_CacheKey(&c->key, cont->mutex);
}
- while (DIR_MASK_TAG(c->key.word(1)) == DIR_MASK_TAG(c->first_key.word(1)));
+ while (DIR_MASK_TAG(c->key.word(2)) == DIR_MASK_TAG(c->first_key.word(2)));
c->earliest_key = c->key;
- c->f.http_request = 1;
+ c->frag_type = CACHE_FRAG_TYPE_HTTP;
c->part = key_to_part(key, hostname, host_len);
Part *part = c->part;
c->info = info;
@@ -1712,44 +1700,47 @@
CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
c->pin_in_cache = (inku32) apin_in_cache;
- CACHE_TRY_LOCK(lock, c->part->mutex, cont->mutex->thread_holding);
- if (lock) {
- if ((err = c->part->open_write(c, if_writers, cache_config_http_max_alts > 1 ? cache_config_http_max_alts : 0)) > 0) {
- goto Lfailure;
- }
- // If there are multiple writers, then this one cannot be an update.
- // Only the first writer can do an update. If that's the case, we can
- // return success to the state machine now.;
- if (c->od->has_multiple_writers()) {
- SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteMain);
- cont->handleEvent(CACHE_EVENT_OPEN_WRITE, (void *) c);
- return ACTION_RESULT_DONE;
- }
- if (!dir_probe(key, c->part, &c->dir, &c->last_collision)) {
- if (c->f.update) {
- // fail update because vector has been GC'd
- // This situation can also arise in openWriteStartDone
- err = ECACHE_NO_DOC;
+ {
+ CACHE_TRY_LOCK(lock, c->part->mutex, cont->mutex->thread_holding);
+ if (lock) {
+ if ((err = c->part->open_write(c, if_writers,
+ cache_config_http_max_alts > 1 ? cache_config_http_max_alts : 0)) > 0)
goto Lfailure;
+ // If there are multiple writers, then this one cannot be an update.
+ // Only the first writer can do an update. If that's the case, we can
+ // return success to the state machine now.;
+ if (c->od->has_multiple_writers())
+ goto Lmiss;
+ if (!dir_probe(key, c->part, &c->dir, &c->last_collision)) {
+ if (c->f.update) {
+ // fail update because vector has been GC'd
+ // This situation can also arise in openWriteStartDone
+ err = ECACHE_NO_DOC;
+ goto Lfailure;
+ }
+ // document doesn't exist, begin write
+ goto Lmiss;
+ } else {
+ c->od->reading_vec = 1;
+ // document exists, read vector
+ SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteStartDone);
+ switch (c->do_read_call(&c->first_key)) {
+ case EVENT_DONE: return ACTION_RESULT_DONE;
+ case EVENT_RETURN: goto Lcallreturn;
+ default: return &c->_action;
+ }
}
- // document doesn't exist, begin write
- SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteMain);
- cont->handleEvent(CACHE_EVENT_OPEN_WRITE, (void *) c);
- return ACTION_RESULT_DONE;
- } else {
- c->od->reading_vec = 1;
- // document exists, read vector
- SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteStartDone);
- if (c->do_read(&c->first_key) == EVENT_CONT)
- return &c->_action;
- else
- return ACTION_RESULT_DONE;
}
+ // missed lock
+ SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteStartDone);
+ CONT_SCHED_LOCK_RETRY(c);
+ return &c->_action;
}
- // missed lock
- SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteStartDone);
- CONT_SCHED_LOCK_RETRY(c);
- return &c->_action;
+
+Lmiss:
+ SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteMain);
+ cont->handleEvent(CACHE_EVENT_OPEN_WRITE, (void *) c);
+ return ACTION_RESULT_DONE;
Lfailure:
CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_FAILURE);
@@ -1761,5 +1752,10 @@
}
free_CacheVC(c);
return ACTION_RESULT_DONE;
+
+Lcallreturn:
+ if (c->handleEvent(AIO_EVENT_DONE, 0) == EVENT_DONE)
+ return ACTION_RESULT_DONE;
+ return &c->_action;
}
#endif
Modified: incubator/trafficserver/traffic/branches/dev/iocore/cache/I_Cache.h
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/branches/dev/iocore/cache/I_Cache.h?rev=892310&r1=892309&r2=892310&view=diff
==============================================================================
--- incubator/trafficserver/traffic/branches/dev/iocore/cache/I_Cache.h (original)
+++ incubator/trafficserver/traffic/branches/dev/iocore/cache/I_Cache.h Fri Dec 18 16:47:37 2009
@@ -128,8 +128,6 @@
struct CacheVConnection:public VConnection
{
- CacheVConnection();
-
VIO *do_io_read(Continuation * c, int nbytes, MIOBuffer * buf) = 0;
VIO *do_io_write(Continuation * c, int nbytes, IOBufferReader * buf, bool owner = false) = 0;
void do_io_close(int lerrno = -1) = 0;
@@ -141,13 +139,25 @@
ink_assert(!"CacheVConnection::do_io_shutdown unsupported");
}
+ virtual int get_header(void **ptr, int *len) = 0;
+ virtual int set_header(void *ptr, int len) = 0;
+ // do_io_pread() may only be issued once in response to CACHE_EVENT_OPEN_READ
+ virtual VIO *do_io_pread(Continuation *c, ink64 nbytes, MIOBuffer *buf, ink_off_t off) = 0;
+
#ifdef HTTP_CACHE
virtual void set_http_info(CacheHTTPInfo * info) = 0;
virtual void get_http_info(CacheHTTPInfo ** info) = 0;
#endif
- virtual bool is_ram_cache_hit() = 0;
virtual Action *action() = 0;
+ virtual bool is_ram_cache_hit() = 0;
+ virtual bool set_disk_io_priority(int priority) = 0;
+ virtual int get_disk_io_priority() = 0;
+ virtual bool set_pin_in_cache(time_t t) = 0;
+ virtual time_t get_pin_in_cache() = 0;
+ virtual int get_object_size() = 0;
+
+ CacheVConnection();
};
void ink_cache_init(ModuleVersion version);
Modified: incubator/trafficserver/traffic/branches/dev/iocore/cache/I_CacheDefs.h
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/branches/dev/iocore/cache/I_CacheDefs.h?rev=892310&r1=892309&r2=892310&view=diff
==============================================================================
--- incubator/trafficserver/traffic/branches/dev/iocore/cache/I_CacheDefs.h (original)
+++ incubator/trafficserver/traffic/branches/dev/iocore/cache/I_CacheDefs.h Fri Dec 18 16:47:37 2009
@@ -33,10 +33,10 @@
#define CACHE_ALT_INDEX_DEFAULT -1
#define CACHE_ALT_REMOVED -2
-#define CACHE_DB_MAJOR_VERSION 19
+#define CACHE_DB_MAJOR_VERSION 20
#define CACHE_DB_MINOR_VERSION 0
-#define CACHE_DIR_MAJOR_VERSION 16
+#define CACHE_DIR_MAJOR_VERSION 17
#define CACHE_DIR_MINOR_VERSION 0
#define CACHE_DB_FDS 128
@@ -126,4 +126,11 @@
#define CacheKey INK_MD5
#define CACHE_ALLOW_MULTIPLE_WRITES 1
+
+/* uses of the CacheKey
+ word(0) - cache partition segment
+ word(1) - cache partition bucket
+ word(2) - tag (lower bits), hosttable hash (upper bits)
+ word(3) - ram cache hash, lookaside cache
+ */
#endif // __CACHE_DEFS_H__
Modified: incubator/trafficserver/traffic/branches/dev/iocore/cache/NewCacheVC.h
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/branches/dev/iocore/cache/NewCacheVC.h?rev=892310&r1=892309&r2=892310&view=diff
==============================================================================
--- incubator/trafficserver/traffic/branches/dev/iocore/cache/NewCacheVC.h (original)
+++ incubator/trafficserver/traffic/branches/dev/iocore/cache/NewCacheVC.h Fri Dec 18 16:47:37 2009
@@ -90,6 +90,46 @@
{
return &_action;
}
+ bool set_pin_in_cache(time_t time_pin)
+ {
+ ink_assert(!"implemented");
+ return false;
+ }
+ bool set_disk_io_priority(int priority)
+ {
+ ink_assert(!"implemented");
+ return false;
+ }
+ time_t get_pin_in_cache()
+ {
+ ink_assert(!"implemented");
+ return 0;
+ }
+ int
+ get_disk_io_priority()
+ {
+ ink_assert(!"implemented");
+ return 0;
+ }
+ int get_header(void **ptr, int *len)
+ {
+ ink_assert(!"implemented");
+ return -1;
+ }
+ int set_header(void *ptr, int len)
+ {
+ ink_assert(!"implemented");
+ return -1;
+ }
+ int get_object_size()
+ {
+ ink_assert(!"implemented");
+ return -1;
+ }
+ VIO *do_io_pread(Continuation *c, ink64 nbytes, MIOBuffer *buf, ink_off_t off) {
+ ink_assert(!"implemented");
+ return 0;
+ }
bool appendCacheHttpInfo(const void *data, const inku64 size);
bool completeCacheHttpInfo(const void *data, const inku64 size);