You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by am...@apache.org on 2015/07/19 16:14:47 UTC
[7/8] trafficserver git commit: TS-974: Partial Object Caching.
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/1c06db83/iocore/cache/CacheRead.cc
----------------------------------------------------------------------
diff --git a/iocore/cache/CacheRead.cc b/iocore/cache/CacheRead.cc
index e8ff804..4523c2a 100644
--- a/iocore/cache/CacheRead.cc
+++ b/iocore/cache/CacheRead.cc
@@ -48,14 +48,14 @@ Cache::open_read(Continuation *cont, const CacheKey *key, CacheFragType type, co
CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
if (!lock.is_locked() || (od = vol->open_read(key)) || dir_probe(key, vol, &result, &last_collision)) {
c = new_CacheVC(cont);
- SET_CONTINUATION_HANDLER(c, &CacheVC::openReadStartHead);
+ c->vol = vol;
+ c->first_key = c->key = c->earliest_key = *key;
c->vio.op = VIO::READ;
c->base_stat = cache_read_active_stat;
- CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
- c->first_key = c->key = c->earliest_key = *key;
- c->vol = vol;
- c->frag_type = type;
c->od = od;
+ c->frag_type = type;
+ CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
+ SET_CONTINUATION_HANDLER(c, &CacheVC::openReadStartHead);
}
if (!c)
goto Lmiss;
@@ -91,7 +91,46 @@ Lcallreturn:
return &c->_action;
}
-#ifdef HTTP_CACHE
+
+Action *
+Cache::open_read(Continuation *cont, CacheVConnection *vc, HTTPHdr *client_request_hdr)
+{
+ Action *zret = ACTION_RESULT_DONE;
+
+ CacheVC *write_vc = dynamic_cast<CacheVC *>(vc);
+ if (write_vc) {
+ Vol *vol = write_vc->vol;
+ ProxyMutex *mutex = cont->mutex; // needed for stat macros
+ CacheVC *c = new_CacheVC(cont);
+
+ c->vol = write_vc->vol;
+ c->first_key = write_vc->first_key;
+ // [amc] Need to fix this as it's pointless. In general @a earliest_key in the write VC
+ // won't be the correct value - it's randomly generated and for a partial fill won't be
+ // set to the actual alternate value until later (in @c set_http_info).
+ c->earliest_key = c->key = write_vc->earliest_key;
+ c->vio.op = VIO::READ;
+ c->base_stat = cache_read_active_stat;
+ c->od = write_vc->od;
+ c->frag_type = write_vc->frag_type;
+ CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
+ // write_vc->alternate.request_get(&c->request);
+ // client_request_hdr->copy_shallow(&c->request);
+ c->request.copy_shallow(client_request_hdr);
+ c->params = write_vc->params; // seems to be a no-op, always NULL.
+ c->dir = c->first_dir = write_vc->first_dir;
+ c->write_vc = write_vc;
+ c->first_buf = write_vc->first_buf; // I don't think this is effective either.
+ SET_CONTINUATION_HANDLER(c, &CacheVC::openReadFromWriter);
+ zret = &c->_action; // default, override if needed.
+ CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
+ if (lock.is_locked() && c->handleEvent(EVENT_IMMEDIATE, 0) == EVENT_DONE) {
+ zret = ACTION_RESULT_DONE;
+ }
+ }
+ return zret;
+}
+
Action *
Cache::open_read(Continuation *cont, const CacheKey *key, CacheHTTPHdr *request, CacheLookupHttpConfig *params, CacheFragType type,
const char *hostname, int host_len)
@@ -112,15 +151,15 @@ Cache::open_read(Continuation *cont, const CacheKey *key, CacheHTTPHdr *request,
CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
if (!lock.is_locked() || (od = vol->open_read(key)) || dir_probe(key, vol, &result, &last_collision)) {
c = new_CacheVC(cont);
- c->first_key = c->key = c->earliest_key = *key;
c->vol = vol;
+ c->first_key = c->key = c->earliest_key = *key;
c->vio.op = VIO::READ;
c->base_stat = cache_read_active_stat;
+ c->od = od;
+ c->frag_type = CACHE_FRAG_TYPE_HTTP;
CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
c->request.copy_shallow(request);
- c->frag_type = CACHE_FRAG_TYPE_HTTP;
c->params = params;
- c->od = od;
}
if (!lock.is_locked()) {
SET_CONTINUATION_HANDLER(c, &CacheVC::openReadStartHead);
@@ -160,27 +199,55 @@ Lcallreturn:
return ACTION_RESULT_DONE;
return &c->_action;
}
-#endif
uint32_t
CacheVC::load_http_info(CacheHTTPInfoVector *info, Doc *doc, RefCountObj *block_ptr)
{
uint32_t zret = info->get_handles(doc->hdr(), doc->hlen, block_ptr);
- if (cache_config_compatibility_4_2_0_fixup && // manual override not engaged
+ if (zret != static_cast<uint32_t>(-1) && // Make sure we haven't already failed
+ cache_config_compatibility_4_2_0_fixup && // manual override not engaged
!this->f.doc_from_ram_cache && // it's already been done for ram cache fragments
vol->header->version.ink_major == 23 && vol->header->version.ink_minor == 0) {
for (int i = info->xcount - 1; i >= 0; --i) {
- info->data(i).alternate.m_alt->m_response_hdr.m_mime->recompute_accelerators_and_presence_bits();
- info->data(i).alternate.m_alt->m_request_hdr.m_mime->recompute_accelerators_and_presence_bits();
+ info->data(i)._alternate.m_alt->m_response_hdr.m_mime->recompute_accelerators_and_presence_bits();
+ info->data(i)._alternate.m_alt->m_request_hdr.m_mime->recompute_accelerators_and_presence_bits();
}
}
return zret;
}
+char const *
+CacheVC::get_http_range_boundary_string(int *len) const
+{
+ return resp_range.getBoundaryStr(len);
+}
+
+int64_t
+CacheVC::get_effective_content_size()
+{
+ return resp_range.hasRanges() ? resp_range.calcContentLength() : alternate.object_size_get();
+}
+
+int
+CacheVC::closeReadAndFree(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
+{
+ // cancel_trigger(); // ??
+ if (od) {
+ CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
+ if (!lock.is_locked()) {
+ SET_HANDLER(&CacheVC::closeReadAndFree);
+ VC_SCHED_LOCK_RETRY();
+ }
+ vol->close_read(this);
+ }
+ return free_CacheVC(this);
+}
+
int
CacheVC::openReadFromWriterFailure(int event, Event *e)
{
- od = NULL;
+ // od = NULL;
+ vol->close_read(this);
vector.clear(false);
CACHE_INCREMENT_DYN_STAT(cache_read_failure_stat);
CACHE_INCREMENT_DYN_STAT(cache_read_busy_failure_stat);
@@ -190,105 +257,6 @@ CacheVC::openReadFromWriterFailure(int event, Event *e)
}
int
-CacheVC::openReadChooseWriter(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
-{
- intptr_t err = ECACHE_DOC_BUSY;
- CacheVC *w = NULL;
-
- ink_assert(vol->mutex->thread_holding == mutex->thread_holding && write_vc == NULL);
-
- if (!od)
- return EVENT_RETURN;
-
- if (frag_type != CACHE_FRAG_TYPE_HTTP) {
- ink_assert(od->num_writers == 1);
- w = od->writers.head;
- if (w->start_time > start_time || w->closed < 0) {
- od = NULL;
- return EVENT_RETURN;
- }
- if (!w->closed)
- return -err;
- write_vc = w;
- }
-#ifdef HTTP_CACHE
- else {
- write_vector = &od->vector;
- int write_vec_cnt = write_vector->count();
- for (int c = 0; c < write_vec_cnt; c++)
- vector.insert(write_vector->get(c));
- // check if all the writers who came before this reader have
- // set the http_info.
- for (w = (CacheVC *)od->writers.head; w; w = (CacheVC *)w->opendir_link.next) {
- if (w->start_time > start_time || w->closed < 0)
- continue;
- if (!w->closed && !cache_config_read_while_writer) {
- return -err;
- }
- if (w->alternate_index != CACHE_ALT_INDEX_DEFAULT)
- continue;
-
- if (!w->closed && !w->alternate.valid()) {
- od = NULL;
- ink_assert(!write_vc);
- vector.clear(false);
- return EVENT_CONT;
- }
- // construct the vector from the writers.
- int alt_ndx = CACHE_ALT_INDEX_DEFAULT;
- if (w->f.update) {
- // all Update cases. Need to get the alternate index.
- alt_ndx = get_alternate_index(&vector, w->update_key);
- // if its an alternate delete
- if (!w->alternate.valid()) {
- if (alt_ndx >= 0)
- vector.remove(alt_ndx, false);
- continue;
- }
- }
- ink_assert(w->alternate.valid());
- if (w->alternate.valid())
- vector.insert(&w->alternate, alt_ndx);
- }
-
- if (!vector.count()) {
- if (od->reading_vec) {
- // the writer(s) are reading the vector, so there is probably
- // an old vector. Since this reader came before any of the
- // current writers, we should return the old data
- od = NULL;
- return EVENT_RETURN;
- }
- return -ECACHE_NO_DOC;
- }
- if (cache_config_select_alternate) {
- alternate_index = HttpTransactCache::SelectFromAlternates(&vector, &request, params);
- if (alternate_index < 0)
- return -ECACHE_ALT_MISS;
- } else
- alternate_index = 0;
- CacheHTTPInfo *obj = vector.get(alternate_index);
- for (w = (CacheVC *)od->writers.head; w; w = (CacheVC *)w->opendir_link.next) {
- if (obj->m_alt == w->alternate.m_alt) {
- write_vc = w;
- break;
- }
- }
- vector.clear(false);
- if (!write_vc) {
- DDebug("cache_read_agg", "%p: key: %X writer alternate different: %d", this, first_key.slice32(1), alternate_index);
- od = NULL;
- return EVENT_RETURN;
- }
-
- DDebug("cache_read_agg", "%p: key: %X eKey: %d # alts: %d, ndx: %d, # writers: %d writer: %p", this, first_key.slice32(1),
- write_vc->earliest_key.slice32(1), vector.count(), alternate_index, od->num_writers, write_vc);
- }
-#endif // HTTP_CACHE
- return EVENT_NONE;
-}
-
-int
CacheVC::openReadFromWriter(int event, Event *e)
{
if (!f.read_from_writer_called) {
@@ -304,167 +272,70 @@ CacheVC::openReadFromWriter(int event, Event *e)
f.read_from_writer_called = 1;
}
cancel_trigger();
- intptr_t err = ECACHE_DOC_BUSY;
DDebug("cache_read_agg", "%p: key: %X In openReadFromWriter", this, first_key.slice32(1));
-#ifndef READ_WHILE_WRITER
- return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, (Event *)-err);
-#else
+
if (_action.cancelled) {
- od = NULL; // only open for read so no need to close
- return free_CacheVC(this);
+ return this->closeReadAndFree(0, NULL);
+ // od = NULL; // only open for read so no need to close
+ // return free_CacheVC(this);
}
CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
if (!lock.is_locked())
VC_SCHED_LOCK_RETRY();
- od = vol->open_read(&first_key); // recheck in case the lock failed
- if (!od) {
+ if (!od && NULL == (od = vol->open_read(&first_key))) {
MUTEX_RELEASE(lock);
write_vc = NULL;
SET_HANDLER(&CacheVC::openReadStartHead);
return openReadStartHead(event, e);
- } else
- ink_assert(od == vol->open_read(&first_key));
- if (!write_vc) {
- int ret = openReadChooseWriter(event, e);
- if (ret < 0) {
- MUTEX_RELEASE(lock);
- SET_HANDLER(&CacheVC::openReadFromWriterFailure);
- return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, reinterpret_cast<Event *>(ret));
- } else if (ret == EVENT_RETURN) {
- MUTEX_RELEASE(lock);
- SET_HANDLER(&CacheVC::openReadStartHead);
- return openReadStartHead(event, e);
- } else if (ret == EVENT_CONT) {
- ink_assert(!write_vc);
- VC_SCHED_WRITER_RETRY();
- } else
- ink_assert(write_vc);
- } else {
- if (writer_done()) {
- MUTEX_RELEASE(lock);
- DDebug("cache_read_agg", "%p: key: %X writer %p has left, continuing as normal read", this, first_key.slice32(1), write_vc);
- od = NULL;
- write_vc = NULL;
- SET_HANDLER(&CacheVC::openReadStartHead);
- return openReadStartHead(event, e);
- }
- }
-#ifdef HTTP_CACHE
- OpenDirEntry *cod = od;
-#endif
- od = NULL;
- // someone is currently writing the document
- if (write_vc->closed < 0) {
- MUTEX_RELEASE(lock);
- write_vc = NULL;
- // writer aborted, continue as if there is no writer
- SET_HANDLER(&CacheVC::openReadStartHead);
- return openReadStartHead(EVENT_IMMEDIATE, 0);
- }
- // allow reading from unclosed writer for http requests only.
- ink_assert(frag_type == CACHE_FRAG_TYPE_HTTP || write_vc->closed);
- if (!write_vc->closed && !write_vc->fragment) {
- if (!cache_config_read_while_writer || frag_type != CACHE_FRAG_TYPE_HTTP ||
- writer_lock_retry >= cache_config_read_while_writer_max_retries) {
- MUTEX_RELEASE(lock);
- return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, (Event *)-err);
- }
- DDebug("cache_read_agg", "%p: key: %X writer: closed:%d, fragment:%d, retry: %d", this, first_key.slice32(1), write_vc->closed,
- write_vc->fragment, writer_lock_retry);
- VC_SCHED_WRITER_RETRY();
}
- CACHE_TRY_LOCK(writer_lock, write_vc->mutex, mutex->thread_holding);
- if (!writer_lock.is_locked()) {
- DDebug("cache_read_agg", "%p: key: %X lock miss", this, first_key.slice32(1));
+ CACHE_TRY_LOCK(lock_od, od->mutex, mutex->thread_holding);
+ if (!lock_od.is_locked())
VC_SCHED_LOCK_RETRY();
- }
- MUTEX_RELEASE(lock);
- if (!write_vc->io.ok())
- return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, (Event *)-err);
-#ifdef HTTP_CACHE
- if (frag_type == CACHE_FRAG_TYPE_HTTP) {
- DDebug("cache_read_agg", "%p: key: %X http passed stage 1, closed: %d, frag: %d", this, first_key.slice32(1), write_vc->closed,
- write_vc->fragment);
- if (!write_vc->alternate.valid())
- return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, (Event *)-err);
- alternate.copy(&write_vc->alternate);
- vector.insert(&alternate);
- alternate.object_key_get(&key);
- write_vc->f.readers = 1;
- if (!(write_vc->f.update && write_vc->total_len == 0)) {
- key = write_vc->earliest_key;
- if (!write_vc->closed)
- alternate.object_size_set(write_vc->vio.nbytes);
- else
- alternate.object_size_set(write_vc->total_len);
- } else {
- key = write_vc->update_key;
- ink_assert(write_vc->closed);
- DDebug("cache_read_agg", "%p: key: %X writer header update", this, first_key.slice32(1));
- // Update case (b) : grab doc_len from the writer's alternate
- doc_len = alternate.object_size_get();
- if (write_vc->update_key == cod->single_doc_key && (cod->move_resident_alt || write_vc->f.rewrite_resident_alt) &&
- write_vc->first_buf._ptr()) {
- // the resident alternate is being updated and its a
- // header only update. The first_buf of the writer has the
- // document body.
- Doc *doc = (Doc *)write_vc->first_buf->data();
- writer_buf = new_IOBufferBlock(write_vc->first_buf, doc->data_len(), doc->prefix_len());
- MUTEX_RELEASE(writer_lock);
- ink_assert(doc_len == doc->data_len());
- length = doc_len;
- f.single_fragment = 1;
- doc_pos = 0;
- earliest_key = key;
- dir_clean(&first_dir);
- dir_clean(&earliest_dir);
- SET_HANDLER(&CacheVC::openReadFromWriterMain);
- CACHE_INCREMENT_DYN_STAT(cache_read_busy_success_stat);
- return callcont(CACHE_EVENT_OPEN_READ);
- }
- // want to snarf the new headers from the writer
- // and then continue as if nothing happened
- last_collision = NULL;
- MUTEX_RELEASE(writer_lock);
- SET_HANDLER(&CacheVC::openReadStartEarliest);
- return openReadStartEarliest(event, e);
+ if (od->open_writer) {
+ // Alternates are in flux, wait for origin server response to update them.
+ if (!od->open_waiting.in(this)) {
+ wake_up_thread = mutex->thread_holding;
+ od->open_waiting.push(this);
}
- } else {
-#endif // HTTP_CACHE
- DDebug("cache_read_agg", "%p: key: %X non-http passed stage 1", this, first_key.slice32(1));
- key = write_vc->earliest_key;
-#ifdef HTTP_CACHE
+ Debug("amc", "[CacheVC::openReadFromWriter] waiting for %p", od->open_writer);
+ return EVENT_CONT; // wait for the writer to wake us up.
}
-#endif
- if (write_vc->fragment) {
- doc_len = write_vc->vio.nbytes;
- last_collision = NULL;
- DDebug("cache_read_agg", "%p: key: %X closed: %d, fragment: %d, len: %d starting first fragment", this, first_key.slice32(1),
- write_vc->closed, write_vc->fragment, (int)doc_len);
- MUTEX_RELEASE(writer_lock);
- // either a header + body update or a new document
+
+ MUTEX_RELEASE(lock); // we have the OD lock now, don't need the vol lock.
+
+ if (write_vc && CACHE_ALT_INDEX_DEFAULT != (alternate_index = get_alternate_index(&(od->vector), write_vc->earliest_key))) {
+ // Found the alternate for our write VC. Really, though, if we have a write_vc we should never fail to get
+ // the alternate - we should probably check for that.
+ alternate.copy_shallow(od->vector.get(alternate_index));
+ key = earliest_key = alternate.object_key_get();
+ doc_len = alternate.object_size_get();
+ Debug("amc", "[openReadFromWriter] - setting alternate from write_vc %p to #%d : %p", write_vc, alternate_index,
+ alternate.m_alt);
+ MUTEX_RELEASE(lock_od);
SET_HANDLER(&CacheVC::openReadStartEarliest);
return openReadStartEarliest(event, e);
+ } else {
+ if (cache_config_select_alternate) {
+ alternate_index = HttpTransactCache::SelectFromAlternates(&od->vector, &request, params);
+ if (alternate_index < 0) {
+ MUTEX_RELEASE(lock_od);
+ SET_HANDLER(&CacheVC::openReadFromWriterFailure);
+ return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, reinterpret_cast<Event *>(-ECACHE_ALT_MISS));
+ }
+ Debug("amc", "[openReadFromWriter] select alt: %d %p (current %p)", alternate_index, od->vector.get(alternate_index)->m_alt,
+ alternate.m_alt);
+ write_vector = &od->vector;
+ } else {
+ alternate_index = 0;
+ }
+ MUTEX_RELEASE(lock_od);
+ SET_HANDLER(&CacheVC::openReadStartHead);
+ return openReadStartHead(event, e);
}
- writer_buf = write_vc->blocks;
- writer_offset = write_vc->offset;
- length = write_vc->length;
- // copy the vector
- f.single_fragment = !write_vc->fragment; // single fragment doc
- doc_pos = 0;
- earliest_key = write_vc->earliest_key;
- ink_assert(earliest_key == key);
- doc_len = write_vc->total_len;
- dir_clean(&first_dir);
- dir_clean(&earliest_dir);
- DDebug("cache_read_agg", "%p: key: %X %X: single fragment read", this, first_key.slice32(1), key.slice32(0));
- MUTEX_RELEASE(writer_lock);
- SET_HANDLER(&CacheVC::openReadFromWriterMain);
- CACHE_INCREMENT_DYN_STAT(cache_read_busy_success_stat);
- return callcont(CACHE_EVENT_OPEN_READ);
-#endif // READ_WHILE_WRITER
+ ink_assert(false);
+ return EVENT_DONE; // should not get here.
}
int
@@ -575,6 +446,8 @@ CacheVC::openReadReadDone(int event, Event *e)
goto Lcallreturn;
return EVENT_CONT;
} else if (write_vc) {
+ ink_release_assert(!"[amc] Handle this");
+#if 0
if (writer_done()) {
last_collision = NULL;
while (dir_probe(&earliest_key, vol, &dir, &last_collision)) {
@@ -589,6 +462,7 @@ CacheVC::openReadReadDone(int event, Event *e)
}
DDebug("cache_read_agg", "%p: key: %X ReadRead retrying: %d", this, first_key.slice32(1), (int)vio.ndone);
VC_SCHED_WRITER_RETRY(); // wait for writer
+#endif
}
// fall through for truncated documents
}
@@ -596,126 +470,144 @@ Lerror:
char tmpstring[100];
Warning("Document %s truncated", earliest_key.toHexStr(tmpstring));
return calluser(VC_EVENT_ERROR);
-Ldone:
+ // Ldone:
return calluser(VC_EVENT_EOS);
Lcallreturn:
return handleEvent(AIO_EVENT_DONE, 0);
LreadMain:
- fragment++;
+ ++fragment;
doc_pos = doc->prefix_len();
+ doc_pos += resp_range.getOffset() - frag_upper_bound; // used before update!
+ frag_upper_bound += doc->data_len();
next_CacheKey(&key, &key);
SET_HANDLER(&CacheVC::openReadMain);
return openReadMain(event, e);
}
+void
+CacheVC::update_key_to_frag_idx(int target)
+{
+ if (0 == target) {
+ fragment = 0;
+ key = earliest_key;
+ } else {
+ FragmentDescriptor *frag = alternate.force_frag_at(target);
+ ink_assert(frag);
+ key = frag->m_key;
+ }
+}
+
+int
+CacheVC::frag_idx_for_offset(uint64_t offset)
+{
+ FragmentDescriptorTable *frags = alternate.get_frag_table();
+ int count = alternate.get_frag_count();
+ uint32_t ffs = alternate.get_frag_fixed_size();
+ int idx = count / 2;
+
+ ink_assert(offset < doc_len);
+
+ if (ffs)
+ idx = offset / ffs; // good guess as to the right offset.
+
+ if (count > 1 && 0 == (*frags)[1].m_offset)
+ ++idx;
+
+ do {
+ uint64_t upper = idx >= count ? doc_len : (*frags)[idx + 1].m_offset;
+ uint64_t lower = idx <= 0 ? 0 : (*frags)[idx].m_offset;
+ if (offset < lower)
+ idx = idx / 2;
+ else if (offset >= upper)
+ idx = (count + idx + 1) / 2;
+ else
+ break;
+ } while (true);
+ return idx;
+}
+
+/* There is a fragment available, decide what do to next.
+ */
int
CacheVC::openReadMain(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
{
cancel_trigger();
Doc *doc = (Doc *)buf->data();
- int64_t ntodo = vio.ntodo();
- int64_t bytes = doc->len - doc_pos;
+ int64_t bytes = vio.ntodo();
IOBufferBlock *b = NULL;
- if (seek_to) { // handle do_io_pread
- if (seek_to >= doc_len) {
- vio.ndone = doc_len;
- return calluser(VC_EVENT_EOS);
+ uint64_t target_offset = resp_range.getOffset();
+ uint64_t lower_bound = frag_upper_bound - doc->data_len();
+
+ if (bytes <= 0)
+ return EVENT_CONT;
+
+ // Start shipping
+ while (bytes > 0 && lower_bound <= target_offset && target_offset < frag_upper_bound) {
+ if (vio.buffer.writer()->max_read_avail() > vio.buffer.writer()->water_mark && vio.ndone) // wait for reader
+ return EVENT_CONT;
+
+ if (resp_range.hasPendingRangeShift()) { // in new range, shift to start location.
+ int b_len;
+ char const *b_str = resp_range.getBoundaryStr(&b_len);
+ size_t r_idx = resp_range.getIdx();
+
+ doc_pos = doc->prefix_len() + (target_offset - lower_bound);
+
+ vio.ndone +=
+ HTTPRangeSpec::writePartBoundary(vio.buffer.writer(), b_str, b_len, doc_len, resp_range[r_idx]._min, resp_range[r_idx]._max,
+ resp_range.getContentTypeField(), r_idx >= (resp_range.count() - 1));
+ resp_range.consumeRangeShift();
+ Debug("amc", "Range boundary for range %" PRIu64, r_idx);
}
-#ifdef HTTP_CACHE
- HTTPInfo::FragOffset *frags = alternate.get_frag_table();
- if (is_debug_tag_set("cache_seek")) {
- char b[33], c[33];
- Debug("cache_seek", "Seek @ %" PRId64 " in %s from #%d @ %" PRId64 "/%d:%s", seek_to, first_key.toHexStr(b), fragment,
- doc_pos, doc->len, doc->key.toHexStr(c));
+
+ bytes = std::min(doc->len - doc_pos, static_cast<int64_t>(resp_range.getRemnantSize()));
+ bytes = std::min(bytes, vio.ntodo());
+ if (bytes > 0) {
+ b = new_IOBufferBlock(buf, bytes, doc_pos);
+ b->_buf_end = b->_end;
+ vio.buffer.writer()->append_block(b);
+ vio.ndone += bytes;
+ doc_pos += bytes;
+ resp_range.consume(bytes);
+ Debug("amc", "shipped %" PRId64 " bytes at target offset %" PRIu64, bytes, target_offset);
+ target_offset = resp_range.getOffset();
}
- /* Because single fragment objects can migrate to hang off an alt vector
- they can appear to the VC as multi-fragment when they are not really.
- The essential difference is the existence of a fragment table.
- */
- if (frags) {
- int target = 0;
- HTTPInfo::FragOffset next_off = frags[target];
- int lfi = static_cast<int>(alternate.get_frag_offset_count()) - 1;
- ink_assert(lfi >= 0); // because it's not a single frag doc.
-
- /* Note: frag[i].offset is the offset of the first byte past the
- i'th fragment. So frag[0].offset is the offset of the first
- byte of fragment 1. In addition the # of fragments is one
- more than the fragment table length, the start of the last
- fragment being the last offset in the table.
- */
- if (fragment == 0 || seek_to < frags[fragment - 1] || (fragment <= lfi && frags[fragment] <= seek_to)) {
- // search from frag 0 on to find the proper frag
- while (seek_to >= next_off && target < lfi) {
- next_off = frags[++target];
- }
- if (target == lfi && seek_to >= next_off)
- ++target;
- } else { // shortcut if we are in the fragment already
- target = fragment;
- }
- if (target != fragment) {
- // Lread will read the next fragment always, so if that
- // is the one we want, we don't need to do anything
- int cfi = fragment;
- --target;
- while (target > fragment) {
- next_CacheKey(&key, &key);
- ++fragment;
- }
- while (target < fragment) {
- prev_CacheKey(&key, &key);
- --fragment;
- }
- if (is_debug_tag_set("cache_seek")) {
- char target_key_str[33];
- key.toHexStr(target_key_str);
- Debug("cache_seek", "Seek #%d @ %" PRId64 " -> #%d @ %" PRId64 ":%s", cfi, doc_pos, target, seek_to, target_key_str);
- }
- goto Lread;
+ if (vio.ntodo() <= 0)
+ return calluser(VC_EVENT_READ_COMPLETE);
+ else if (calluser(VC_EVENT_READ_READY) == EVENT_DONE)
+ return EVENT_DONE;
+ }
+
+
+#ifdef HTTP_CACHE
+ if (resp_range.getRemnantSize()) {
+ FragmentDescriptorTable *frags = alternate.get_frag_table();
+ int n_frags = alternate.get_frag_count();
+
+ // Quick check for offset in next fragment - very common
+ if (target_offset >= frag_upper_bound && (!frags || fragment >= n_frags || target_offset <= (*frags)[fragment].m_offset)) {
+ Debug("amc", "Non-seeking continuation to next fragment");
+ } else {
+ int target = -1; // target fragment index.
+
+ if (is_debug_tag_set("amc")) {
+ char b[33], c[33];
+ Debug("amc", "Seek @ %" PRIu64 " [r#=%d] in %s from #%d @ %" PRIu64 "/%d/%" PRId64 ":%s%s", target_offset,
+ resp_range.getIdx(), first_key.toHexStr(b), fragment, frag_upper_bound, doc->len, doc->total_len,
+ doc->key.toHexStr(c), (frags ? "" : "no frag table"));
}
+
+ target = this->frag_idx_for_offset(target_offset);
+ this->update_key_to_frag_idx(target);
+ /// one frag short, because it gets bumped when the fragment is actually read.
+ frag_upper_bound = target > 0 ? (*frags)[target].m_offset : 0;
+ Debug("amc", "Fragment seek from %d to %d target offset %" PRIu64, fragment - 1, target, target_offset);
}
- doc_pos = doc->prefix_len() + seek_to;
- if (fragment)
- doc_pos -= static_cast<int64_t>(frags[fragment - 1]);
- vio.ndone = 0;
- seek_to = 0;
- ntodo = vio.ntodo();
- bytes = doc->len - doc_pos;
- if (is_debug_tag_set("cache_seek")) {
- char target_key_str[33];
- key.toHexStr(target_key_str);
- Debug("cache_seek", "Read # %d @ %" PRId64 "/%d for %" PRId64, fragment, doc_pos, doc->len, bytes);
- }
-#endif
}
- if (ntodo <= 0)
- return EVENT_CONT;
- if (vio.buffer.writer()->max_read_avail() > vio.buffer.writer()->water_mark && vio.ndone) // initiate read of first block
- return EVENT_CONT;
- if ((bytes <= 0) && vio.ntodo() >= 0)
- goto Lread;
- if (bytes > vio.ntodo())
- bytes = vio.ntodo();
- b = new_IOBufferBlock(buf, bytes, doc_pos);
- b->_buf_end = b->_end;
- vio.buffer.writer()->append_block(b);
- vio.ndone += bytes;
- doc_pos += bytes;
- if (vio.ntodo() <= 0)
- return calluser(VC_EVENT_READ_COMPLETE);
- else {
- if (calluser(VC_EVENT_READ_READY) == EVENT_DONE)
- return EVENT_DONE;
- // we have to keep reading until we give the user all the
- // bytes it wanted or we hit the watermark.
- if (vio.ntodo() > 0 && !vio.buffer.writer()->high_water())
- goto Lread;
- return EVENT_CONT;
- }
-Lread : {
- if (vio.ndone >= (int64_t)doc_len)
+#endif
+
+ if (vio.ntodo() > 0 && 0 == resp_range.getRemnantSize())
// reached the end of the document and the user still wants more
return calluser(VC_EVENT_EOS);
last_collision = 0;
@@ -733,25 +625,20 @@ Lread : {
if (dir_probe(&key, vol, &dir, &last_collision)) {
SET_HANDLER(&CacheVC::openReadReadDone);
int ret = do_read_call(&key);
- if (ret == EVENT_RETURN)
- goto Lcallreturn;
+ if (ret == EVENT_RETURN) {
+ lock.release();
+ return handleEvent(AIO_EVENT_DONE, 0);
+ }
return EVENT_CONT;
- } else if (write_vc) {
- if (writer_done()) {
- last_collision = NULL;
- while (dir_probe(&earliest_key, vol, &dir, &last_collision)) {
- if (dir_offset(&dir) == dir_offset(&earliest_dir)) {
- DDebug("cache_read_agg", "%p: key: %X ReadMain complete: %d", this, first_key.slice32(1), (int)vio.ndone);
- doc_len = vio.ndone;
- goto Leos;
- }
- }
+ } else {
+ if (!od->wait_for(earliest_key, this, target_offset)) {
DDebug("cache_read_agg", "%p: key: %X ReadMain writer aborted: %d", this, first_key.slice32(1), (int)vio.ndone);
- goto Lerror;
+ lock.release();
+ return calluser(VC_EVENT_ERROR);
}
- DDebug("cache_read_agg", "%p: key: %X ReadMain retrying: %d", this, first_key.slice32(1), (int)vio.ndone);
+ DDebug("cache_read_agg", "%p: key: %X ReadMain waiting: %d", this, first_key.slice32(1), (int)vio.ndone);
SET_HANDLER(&CacheVC::openReadMain);
- VC_SCHED_WRITER_RETRY();
+ return EVENT_CONT;
}
if (is_action_tag_set("cache"))
ink_release_assert(false);
@@ -759,15 +646,45 @@ Lread : {
key.slice32(1));
// remove the directory entry
dir_delete(&earliest_key, vol, &earliest_dir);
-}
-Lerror:
+ lock.release();
+ // Lerror:
return calluser(VC_EVENT_ERROR);
-Leos:
+ // Leos:
return calluser(VC_EVENT_EOS);
-Lcallreturn:
- return handleEvent(AIO_EVENT_DONE, 0);
}
+int
+CacheVC::openReadWaitEarliest(int evid, Event *)
+{
+ int zret = EVENT_CONT;
+ cancel_trigger();
+
+ CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
+ if (!lock.is_locked())
+ VC_SCHED_LOCK_RETRY();
+ Debug("amc", "[CacheVC::openReadWaitEarliest] [%d]", evid);
+ if (NULL == vol->open_read(&first_key)) {
+ // Writer is gone, so no more data for which to wait.
+ // Best option is to just start over from the first frag.
+ // Most likely scenario - object turned out to be a resident alternate so
+ // there's no explicit earliest frag.
+ lock.release();
+ SET_HANDLER(&self::openReadStartHead);
+ // od = NULL;
+ key = first_key;
+ return handleEvent(EVENT_IMMEDIATE, 0);
+ } else if (dir_probe(&key, vol, &earliest_dir, &last_collision) || dir_lookaside_probe(&key, vol, &earliest_dir, NULL)) {
+ dir = earliest_dir;
+ SET_HANDLER(&self::openReadStartEarliest);
+ if ((zret = do_read_call(&key)) == EVENT_RETURN) {
+ lock.release();
+ return handleEvent(AIO_EVENT_DONE, 0);
+ }
+ }
+ return zret;
+}
+
+
/*
This code follows CacheVC::openReadStartHead closely,
if you change this you might have to change that.
@@ -822,6 +739,8 @@ CacheVC::openReadStartEarliest(int /* event ATS_UNUSED */, Event * /* e ATS_UNUS
earliest_key = key;
doc_pos = doc->prefix_len();
next_CacheKey(&key, &doc->key);
+ fragment = 1;
+ frag_upper_bound = doc->data_len();
vol->begin_read(this);
if (vol->within_hit_evacuate_window(&earliest_dir) &&
(!cache_config_hit_evacuate_size_limit || doc_len <= (uint64_t)cache_config_hit_evacuate_size_limit)) {
@@ -840,9 +759,22 @@ CacheVC::openReadStartEarliest(int /* event ATS_UNUSED */, Event * /* e ATS_UNUS
// read has detected that alternate does not exist in the cache.
// rewrite the vector.
#ifdef HTTP_CACHE
- if (!f.read_from_writer_called && frag_type == CACHE_FRAG_TYPE_HTTP) {
+ // It's OK if there's a writer for this alternate, we can wait on it.
+ if (od && od->has_writer(earliest_key)) {
+ wake_up_thread = mutex->thread_holding;
+ od->wait_for(earliest_key, this, 0);
+ lock.release();
+ // The SM must be signaled that the cache read is open even if we haven't got the earliest frag
+ // yet because otherwise it won't set up the read side of the tunnel before the write side finishes
+ // and terminates the SM (in the case of a resident alternate). But the VC can't be left with this
+ // handler or it will confuse itself when it wakes up from the earliest frag read. So we put it
+ // in a special wait state / handler and then signal the SM.
+ SET_HANDLER(&self::openReadWaitEarliest);
+ return callcont(CACHE_EVENT_OPEN_READ); // must signal read is open
+ } else if (frag_type == CACHE_FRAG_TYPE_HTTP) {
// don't want any writers while we are evacuating the vector
- if (!vol->open_write(this, false, 1)) {
+ ink_release_assert(!"[amc] Not handling multiple writers with vector evacuate");
+ if (!vol->open_write(this)) {
Doc *doc1 = (Doc *)first_buf->data();
uint32_t len = this->load_http_info(write_vector, doc1);
ink_assert(len == doc1->hlen && write_vector->count() > 0);
@@ -939,6 +871,8 @@ CacheVC::openReadVecWrite(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */
if (od->move_resident_alt)
dir_insert(&od->single_doc_key, vol, &od->single_doc_dir);
int alt_ndx = HttpTransactCache::SelectFromAlternates(write_vector, &request, params);
+ Debug("amc", "[openReadVecWrite] select alt: %d %p (current %p)", alt_ndx, write_vector->get(alt_ndx)->m_alt,
+ alternate.m_alt);
vol->close_write(this);
if (alt_ndx >= 0) {
vector.clear();
@@ -963,6 +897,10 @@ Lrestart:
/*
This code follows CacheVC::openReadStartEarliest closely,
if you change this you might have to change that.
+
+ This handles the I/O completion of reading the first doc of the object.
+ If there are alternates, we chain to openreadStartEarliest to read the
+ earliest doc.
*/
int
CacheVC::openReadStartHead(int event, Event *e)
@@ -1044,13 +982,18 @@ CacheVC::openReadStartHead(int event, Event *e)
err = ECACHE_BAD_META_DATA;
goto Ldone;
}
- if (cache_config_select_alternate) {
+ // If @a params is @c NULL then we're a retry from a range request pair so don't do alt select.
+ // Instead try the @a earliest_key - if that's a match then that's the correct alt, written
+ // by the paired write VC.
+ if (cache_config_select_alternate && params) {
alternate_index = HttpTransactCache::SelectFromAlternates(&vector, &request, params);
if (alternate_index < 0) {
err = ECACHE_ALT_MISS;
goto Ldone;
}
- } else
+ Debug("amc", "[openReadStartHead] select alt: %d %p (current %p, od %p)", alternate_index,
+ vector.get(alternate_index)->m_alt, alternate.m_alt, od);
+ } else if (CACHE_ALT_INDEX_DEFAULT == (alternate_index = get_alternate_index(&vector, earliest_key)))
alternate_index = 0;
alternate_tmp = vector.get(alternate_index);
if (!alternate_tmp->valid()) {
@@ -1064,12 +1007,24 @@ CacheVC::openReadStartHead(int event, Event *e)
alternate.copy_shallow(alternate_tmp);
alternate.object_key_get(&key);
doc_len = alternate.object_size_get();
+
+ // If the object length is known we can check the range.
+ // Otherwise we have to leave it vague and talk to the origin to get full length info.
+ if (alternate.m_alt->m_flag.content_length_p && !resp_range.apply(doc_len)) {
+ err = ECACHE_UNSATISFIABLE_RANGE;
+ goto Ldone;
+ }
+ if (resp_range.isMulti())
+ resp_range.setContentTypeFromResponse(alternate.response_get()).generateBoundaryStr(earliest_key);
+
if (key == doc->key) { // is this my data?
f.single_fragment = doc->single_fragment();
ink_assert(f.single_fragment); // otherwise need to read earliest
ink_assert(doc->hlen);
doc_pos = doc->prefix_len();
next_CacheKey(&key, &doc->key);
+ fragment = 1;
+ frag_upper_bound = doc->data_len();
} else {
f.single_fragment = false;
}
@@ -1077,6 +1032,8 @@ CacheVC::openReadStartHead(int event, Event *e)
#endif
{
next_CacheKey(&key, &doc->key);
+ fragment = 1;
+ frag_upper_bound = doc->data_len();
f.single_fragment = doc->single_fragment();
doc_pos = doc->prefix_len();
doc_len = doc->total_len;
@@ -1087,7 +1044,7 @@ CacheVC::openReadStartHead(int event, Event *e)
Debug("cache_read", "CacheReadStartHead - read %s target %s - %s %d of %" PRId64 " bytes, %d fragments",
doc->key.toHexStr(xt), key.toHexStr(yt), f.single_fragment ? "single" : "multi", doc->len, doc->total_len,
#ifdef HTTP_CACHE
- alternate.get_frag_offset_count()
+ alternate.get_frag_count()
#else
0
#endif
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/1c06db83/iocore/cache/CacheTest.cc
----------------------------------------------------------------------
diff --git a/iocore/cache/CacheTest.cc b/iocore/cache/CacheTest.cc
index fb55123..1921060 100644
--- a/iocore/cache/CacheTest.cc
+++ b/iocore/cache/CacheTest.cc
@@ -387,8 +387,8 @@ EXCLUSIVE_REGRESSION_TEST(cache)(RegressionTest *t, int /* atype ATS_UNUSED */,
r_sequential(t, write_test.clone(), lookup_test.clone(), r_sequential(t, 10, read_test.clone()), remove_test.clone(),
lookup_fail_test.clone(), read_fail_test.clone(), remove_fail_test.clone(), replace_write_test.clone(),
- replace_test.clone(), replace_read_test.clone(), large_write_test.clone(), pread_test.clone(), NULL_PTR)
- ->run(pstatus);
+ replace_test.clone(), replace_read_test.clone(), large_write_test.clone(), pread_test.clone(),
+ NULL_PTR)->run(pstatus);
return;
}
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/1c06db83/iocore/cache/CacheVol.cc
----------------------------------------------------------------------
diff --git a/iocore/cache/CacheVol.cc b/iocore/cache/CacheVol.cc
index f9047f3..d5d85bc 100644
--- a/iocore/cache/CacheVol.cc
+++ b/iocore/cache/CacheVol.cc
@@ -413,8 +413,9 @@ CacheVC::scanOpenWrite(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
}
Debug("cache_scan", "trying for writer lock");
- if (vol->open_write(this, false, 1)) {
- writer_lock_retry++;
+ if (vol->open_write(this)) {
+ // [amc] This tried to restrict to one writer, must fix at some point.
+ ++writer_lock_retry;
SET_HANDLER(&CacheVC::scanOpenWrite);
mutex->thread_holding->schedule_in_local(this, scan_msec_delay);
return EVENT_CONT;
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/1c06db83/iocore/cache/CacheWrite.cc
----------------------------------------------------------------------
diff --git a/iocore/cache/CacheWrite.cc b/iocore/cache/CacheWrite.cc
index 3740d21..1fbb16a 100644
--- a/iocore/cache/CacheWrite.cc
+++ b/iocore/cache/CacheWrite.cc
@@ -33,13 +33,19 @@
// used to get the alternate which is actually present in the document
#ifdef HTTP_CACHE
int
-get_alternate_index(CacheHTTPInfoVector *cache_vector, CacheKey key)
+get_alternate_index(CacheHTTPInfoVector *cache_vector, CacheKey key, int idx)
{
int alt_count = cache_vector->count();
CacheHTTPInfo *obj;
if (!alt_count)
return -1;
+ // See if the hint is correct.
+ if (0 <= idx && idx < alt_count && cache_vector->get(idx)->compare_object_key(&key))
+ return idx;
+ // Otherwise scan the vector.
for (int i = 0; i < alt_count; i++) {
+ if (i == idx)
+ continue; // already checked that one.
obj = cache_vector->get(i);
if (obj->compare_object_key(&key)) {
// Debug("cache_key", "Resident alternate key %X", key.slice32(0));
@@ -63,20 +69,23 @@ CacheVC::updateVector(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
VC_SCHED_LOCK_RETRY();
int ret = 0;
{
- CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
+ CACHE_TRY_LOCK(lock, od->mutex, mutex->thread_holding);
if (!lock.is_locked() || od->writing_vec)
VC_SCHED_LOCK_RETRY();
int vec = alternate.valid();
if (f.update) {
// all Update cases. Need to get the alternate index.
- alternate_index = get_alternate_index(write_vector, update_key);
+ alternate_index = get_alternate_index(write_vector, update_key, alternate_index);
Debug("cache_update", "updating alternate index %d frags %d", alternate_index,
- alternate_index >= 0 ? write_vector->get(alternate_index)->get_frag_offset_count() : -1);
+ alternate_index >= 0 ? write_vector->get(alternate_index)->get_frag_count() : -1);
// if its an alternate delete
if (!vec) {
ink_assert(!total_len);
if (alternate_index >= 0) {
+ MUTEX_TRY_LOCK(stripe_lock, vol->mutex, mutex->thread_holding);
+ if (!stripe_lock.is_locked())
+ VC_SCHED_LOCK_RETRY();
write_vector->remove(alternate_index, true);
alternate_index = CACHE_ALT_REMOVED;
if (!write_vector->count())
@@ -98,16 +107,10 @@ CacheVC::updateVector(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
write_vector->remove(0, true);
}
if (vec) {
- /* preserve fragment offset data from old info. This method is
- called iff the update is a header only update so the fragment
- data should remain valid.
- */
- if (alternate_index >= 0)
- alternate.copy_frag_offsets_from(write_vector->get(alternate_index));
alternate_index = write_vector->insert(&alternate, alternate_index);
}
- if (od->move_resident_alt && first_buf._ptr() && !od->has_multiple_writers()) {
+ if (od->move_resident_alt && first_buf._ptr() /* && !od->has_multiple_writers() */) {
Doc *doc = (Doc *)first_buf->data();
int small_doc = (int64_t)doc->data_len() < (int64_t)cache_config_alt_rewrite_max_size;
int have_res_alt = doc->key == od->single_doc_key;
@@ -134,7 +137,7 @@ CacheVC::updateVector(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
od->writing_vec = 1;
f.use_first_key = 1;
SET_HANDLER(&CacheVC::openWriteCloseHeadDone);
- ret = do_write_call();
+ ret = do_write_lock_call();
}
if (ret == EVENT_RETURN)
return handleEvent(AIO_EVENT_DONE, 0);
@@ -161,7 +164,7 @@ CacheVC::updateVector(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
(f.use_fist_key || f.evac_vector) is set. Write_vector is written to disk
- alternate_index. Used only if write_vector needs to be written to disk.
Used to find out the VC's alternate in the write_vector and set its
- length to tatal_len.
+ length to total_len.
- write_len. The number of bytes for this fragment.
- total_len. The total number of bytes for the document so far.
Doc->total_len and alternate's total len is set to this value.
@@ -316,8 +319,8 @@ Vol::aggWriteDone(int event, Event *e)
header->last_write_pos = header->write_pos;
header->write_pos += io.aiocb.aio_nbytes;
ink_assert(header->write_pos >= start);
- DDebug("cache_agg", "Dir %s, Write: %" PRIu64 ", last Write: %" PRIu64 "\n", hash_text.get(), header->write_pos,
- header->last_write_pos);
+ Debug("cache_agg", "Dir %s, Write: %" PRIu64 ", last Write: %" PRIu64 "\n", hash_text.get(), header->write_pos,
+ header->last_write_pos);
ink_assert(header->write_pos == header->agg_pos);
if (header->write_pos + EVACUATION_SIZE > scan_pos)
periodic_scan();
@@ -722,7 +725,7 @@ agg_copy(char *p, CacheVC *vc)
IOBufferBlock *res_alt_blk = 0;
uint32_t len = vc->write_len + vc->header_len + vc->frag_len + sizeofDoc;
- ink_assert(vc->frag_type != CACHE_FRAG_TYPE_HTTP || len != sizeofDoc);
+ ink_assert(vc->frag_type != CACHE_FRAG_TYPE_HTTP || len != sizeofDoc || 0 == vc->fragment);
ink_assert(vol->round_to_approx_size(len) == vc->agg_len);
// update copy of directory entry for this document
dir_set_approx_size(&vc->dir, vc->agg_len);
@@ -781,16 +784,23 @@ agg_copy(char *p, CacheVC *vc)
#ifdef HTTP_CACHE
if (vc->frag_type == CACHE_FRAG_TYPE_HTTP) {
ink_assert(vc->write_vector->count() > 0);
- if (!vc->f.update && !vc->f.evac_vector) {
- ink_assert(!(vc->first_key == zero_key));
- CacheHTTPInfo *http_info = vc->write_vector->get(vc->alternate_index);
- http_info->object_size_set(vc->total_len);
- }
- // update + data_written => Update case (b)
- // need to change the old alternate's object length
- if (vc->f.update && vc->total_len) {
- CacheHTTPInfo *http_info = vc->write_vector->get(vc->alternate_index);
- http_info->object_size_set(vc->total_len);
+ if (vc->resp_range.hasRanges()) {
+ int64_t size = vc->alternate.object_size_get();
+ if (size >= 0)
+ doc->total_len = size;
+ } else {
+ // As the header is finalized the fragment vector should be trimmed if the object is complete.
+ if (!vc->f.update && !vc->f.evac_vector) {
+ ink_assert(!(vc->first_key == zero_key));
+ CacheHTTPInfo *http_info = vc->write_vector->get(vc->alternate_index);
+ http_info->object_size_set(vc->total_len);
+ }
+ // update + data_written => Update case (b)
+ // need to change the old alternate's object length
+ if (vc->f.update && vc->total_len) {
+ CacheHTTPInfo *http_info = vc->write_vector->get(vc->alternate_index);
+ http_info->object_size_set(vc->total_len);
+ }
}
ink_assert(!(((uintptr_t)&doc->hdr()[0]) & HDR_PTR_ALIGNMENT_MASK));
ink_assert(vc->header_len == vc->write_vector->marshal(doc->hdr(), vc->header_len));
@@ -1058,6 +1068,35 @@ Lwait:
}
int
+CacheVC::openWriteEmptyEarliestDone(int event, Event *e)
+{
+ cancel_trigger();
+ if (event == AIO_EVENT_DONE)
+ set_io_not_in_progress();
+ else if (is_io_in_progress())
+ return EVENT_CONT;
+
+ {
+ SCOPED_MUTEX_LOCK(lock, od->mutex, this_ethread());
+ alternate_index = get_alternate_index(write_vector, this->earliest_key);
+ od->write_complete(key, this, io.ok()); // in any case, the IO is over.
+ key = od->key_for(earliest_key, write_pos);
+ }
+
+ SET_HANDLER(&CacheVC::openWriteMain);
+
+ // on error terminate if we're already closed, otherwise notify external continuation.
+ if (!io.ok()) {
+ if (closed) {
+ closed = -1;
+ return die();
+ }
+ return calluser(VC_EVENT_ERROR);
+ }
+ return this->openWriteMain(event, e); // go back to writing our actual data.
+}
+
+int
CacheVC::openWriteCloseDir(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
{
cancel_trigger();
@@ -1068,6 +1107,7 @@ CacheVC::openWriteCloseDir(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED *
ink_assert(!is_io_in_progress());
VC_SCHED_LOCK_RETRY();
}
+ od->close_writer(earliest_key, this);
vol->close_write(this);
if (closed < 0 && fragment)
dir_delete(&earliest_key, vol, &earliest_dir);
@@ -1169,13 +1209,17 @@ CacheVC::openWriteCloseHead(int event, Event *e)
cancel_trigger();
f.use_first_key = 1;
if (io.ok())
- ink_assert(fragment || (length == (int64_t)total_len));
+ ink_assert(fragment || (length == (int64_t)total_len) ||
+ (resp_range.hasRanges() && alternate.object_size_get() > alternate.get_frag_fixed_size()));
else
return openWriteCloseDir(event, e);
- if (f.data_done)
+ if (f.data_done) {
write_len = 0;
- else
+ } else {
write_len = length;
+ // If we're writing data in the first / header doc, then it's a resident alt.
+ alternate.m_alt->m_flag.complete_p = true;
+ }
#ifdef HTTP_CACHE
if (frag_type == CACHE_FRAG_TYPE_HTTP) {
SET_HANDLER(&CacheVC::updateVector);
@@ -1206,35 +1250,32 @@ CacheVC::openWriteCloseDataDone(int event, Event *e)
CACHE_TRY_LOCK(lock, vol->mutex, this_ethread());
if (!lock.is_locked())
VC_LOCK_RETRY_EVENT();
- if (!fragment) {
- ink_assert(key == earliest_key);
- earliest_dir = dir;
-#ifdef HTTP_CACHE
- } else {
- // Store the offset only if there is a table.
- // Currently there is no alt (and thence no table) for non-HTTP.
- if (alternate.valid())
- alternate.push_frag_offset(write_pos);
-#endif
- }
- fragment++;
- write_pos += write_len;
dir_insert(&key, vol, &dir);
- blocks = iobufferblock_skip(blocks, &offset, &length, write_len);
- next_CacheKey(&key, &key);
- if (length) {
- write_len = length;
- if (write_len > MAX_FRAG_SIZE)
- write_len = MAX_FRAG_SIZE;
- if ((ret = do_write_call()) == EVENT_RETURN)
- goto Lcallreturn;
- return ret;
- }
- f.data_done = 1;
- return openWriteCloseHead(event, e); // must be called under vol lock from here
}
-Lcallreturn:
- return handleEvent(AIO_EVENT_DONE, 0);
+
+ if (key == earliest_key)
+ earliest_dir = dir;
+
+ {
+ SCOPED_MUTEX_LOCK(lock, od->mutex, mutex->thread_holding);
+ write_vector->write_complete(earliest_key, this, true);
+ }
+
+ write_pos += write_len;
+ blocks = iobufferblock_skip(blocks, &offset, &length, write_len);
+ next_CacheKey(&key, &key);
+ if (length) {
+ write_len = length;
+ if (write_len > MAX_FRAG_SIZE)
+ write_len = MAX_FRAG_SIZE;
+ if ((ret = do_write_call()) == EVENT_RETURN)
+ return handleEvent(AIO_EVENT_DONE, 0);
+ return ret;
+ }
+
+ f.data_done = 1;
+ return openWriteCloseHead(event, e); // must be called under vol lock from here
+ // [amc] don't see why, guess we'll find out.
}
int
@@ -1267,8 +1308,9 @@ CacheVC::openWriteClose(int event, Event *e)
return openWriteCloseDir(event, e);
#endif
}
- if (length && (fragment || length > MAX_FRAG_SIZE)) {
+ if (length && (fragment || length > MAX_FRAG_SIZE || alternate.object_size_get() > alternate.get_frag_fixed_size())) {
SET_HANDLER(&CacheVC::openWriteCloseDataDone);
+ this->updateWriteStateFromRange();
write_len = length;
if (write_len > MAX_FRAG_SIZE)
write_len = MAX_FRAG_SIZE;
@@ -1296,48 +1338,114 @@ CacheVC::openWriteWriteDone(int event, Event *e)
SET_HANDLER(&CacheVC::openWriteMain);
return calluser(VC_EVENT_ERROR);
}
+
{
CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
if (!lock.is_locked())
VC_LOCK_RETRY_EVENT();
- // store the earliest directory. Need to remove the earliest dir
- // in case the writer aborts.
- if (!fragment) {
- ink_assert(key == earliest_key);
- earliest_dir = dir;
-#ifdef HTTP_CACHE
- } else {
- // Store the offset only if there is a table.
- // Currently there is no alt (and thence no table) for non-HTTP.
- if (alternate.valid())
- alternate.push_frag_offset(write_pos);
-#endif
- }
- ++fragment;
- write_pos += write_len;
dir_insert(&key, vol, &dir);
- DDebug("cache_insert", "WriteDone: %X, %X, %d", key.slice32(0), first_key.slice32(0), write_len);
- blocks = iobufferblock_skip(blocks, &offset, &length, write_len);
- next_CacheKey(&key, &key);
}
+
+ if (key == earliest_key)
+ earliest_dir = dir;
+
+ {
+ SCOPED_MUTEX_LOCK(lock, od->mutex, mutex->thread_holding);
+ write_vector->write_complete(earliest_key, this, true);
+ }
+
+ DDebug("cache_insert", "WriteDone: %X, %X, %d", key.slice32(0), first_key.slice32(0), write_len);
+
+ resp_range.consume(write_len);
+ blocks = iobufferblock_skip(blocks, &offset, &length, write_len);
+
if (closed)
return die();
SET_HANDLER(&CacheVC::openWriteMain);
return openWriteMain(event, e);
}
-static inline int
-target_fragment_size()
+int64_t
+CacheProcessor::get_fixed_fragment_size() const
{
return cache_config_target_fragment_size - sizeofDoc;
}
+
+Action *
+CacheVC::do_write_init()
+{
+ Debug("amc", "[do_write_init] vc=%p", this);
+ SET_CONTINUATION_HANDLER(this, &CacheVC::openWriteInit);
+ return EVENT_DONE == this->openWriteInit(EVENT_IMMEDIATE, 0) ? ACTION_RESULT_DONE : &_action;
+}
+
+/* Do some initial setup and then switch over to openWriteMain
+ */
+int
+CacheVC::openWriteInit(int eid, Event *event)
+{
+ Debug("amc", "[openWriteInit] vc=%p", this);
+ {
+ CACHE_TRY_LOCK(lock, od->mutex, mutex->thread_holding);
+ if (!lock.is_locked()) {
+ trigger = mutex->thread_holding->schedule_in_local(this, HRTIME_MSECONDS(cache_config_mutex_retry_delay), eid);
+ return EVENT_CONT;
+ }
+
+ if (alternate.valid() && earliest_key != alternate.object_key_get()) {
+ // When the VC is created it sets up for a new alternate write. If we're back filling we
+ // need to tweak that back to the existing alternate.
+ Debug("amc", "[CacheVC::openWriteInit] updating earliest key from alternate");
+ alternate.object_key_get(&earliest_key);
+ }
+ // Get synchronized with the OD vector.
+ if (-1 == (alternate_index = get_alternate_index(write_vector, earliest_key))) {
+ Debug("amc", "[openWriteInit] alt not found, inserted");
+ alternate_index = write_vector->insert(&alternate); // not there, add it
+ } else {
+ HTTPInfo *base = write_vector->get(alternate_index);
+ if (!base->is_writeable()) {
+ // The alternate instance is mapped directly on a read buffer, which we can't modify.
+ // It must be replaced with a live, mutable one.
+ Debug("amc", "Updating OD vector element %d : 0x%p with mutable version %p", alternate_index, base, alternate.m_alt);
+ alternate.copy(base); // make a local copy
+ base->copy_shallow(&alternate); // paste the mutable copy back.
+ }
+ }
+ // mark us as an writer.
+ write_vector->data[alternate_index]._writers.push(this);
+ alternate.copy_shallow(write_vector->get(alternate_index));
+
+ if (this == od->open_writer) {
+ od->open_writer = NULL;
+ CacheVC *reader;
+ while (NULL != (reader = od->open_waiting.pop())) {
+ Debug("amc", "[CacheVC::openWriteInit] wake up %p", reader);
+ reader->wake_up_thread->schedule_imm(reader);
+ }
+ }
+ }
+
+ if (resp_range.hasRanges()) {
+ resp_range.start();
+ // this->updateWriteStateFromRange();
+ }
+
+ // key = alternate.get_frag_key_of(write_pos);
+ SET_HANDLER(&CacheVC::openWriteMain);
+ return openWriteMain(eid, event);
+ // return callcont(CACHE_EVENT_OPEN_WRITE);
+ // return EVENT_DONE;
+}
+
int
CacheVC::openWriteMain(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
{
cancel_trigger();
int called_user = 0;
ink_assert(!is_io_in_progress());
+ Debug("amc", "[CacheVC::openWriteMain]");
Lagain:
if (!vio.buffer.writer()) {
if (calluser(VC_EVENT_WRITE_READY) == EVENT_DONE)
@@ -1353,10 +1461,16 @@ Lagain:
if (vio.ntodo() <= 0)
return EVENT_CONT;
}
+
int64_t ntodo = (int64_t)(vio.ntodo() + length);
int64_t total_avail = vio.buffer.reader()->read_avail();
int64_t avail = total_avail;
int64_t towrite = avail + length;
+ int64_t ffs = cacheProcessor.get_fixed_fragment_size();
+
+ Debug("amc", "[CacheVC::openWriteMain] ntodo=%" PRId64 " avail=%" PRId64 " towrite=%" PRId64 " frag=%d", ntodo, avail, towrite,
+ fragment);
+
if (towrite > ntodo) {
avail -= (towrite - ntodo);
towrite = ntodo;
@@ -1369,17 +1483,19 @@ Lagain:
blocks = vio.buffer.reader()->block;
offset = vio.buffer.reader()->start_offset;
}
+
if (avail > 0) {
vio.buffer.reader()->consume(avail);
vio.ndone += avail;
total_len += avail;
}
length = (uint64_t)towrite;
- if (length > target_fragment_size() && (length < target_fragment_size() + target_fragment_size() / 4))
- write_len = target_fragment_size();
+ // [amc] Need to change this to be exactly the fixed fragment size for this alternate.
+ if (length > ffs && (length < ffs + ffs / 4))
+ write_len = ffs;
else
write_len = length;
- bool not_writing = towrite != ntodo && towrite < target_fragment_size();
+ bool not_writing = towrite != ntodo && towrite < ffs;
if (!called_user) {
if (not_writing) {
called_user = 1;
@@ -1391,12 +1507,61 @@ Lagain:
}
if (not_writing)
return EVENT_CONT;
+
+ this->updateWriteStateFromRange();
+
+ {
+ CacheHTTPInfo *alt = &alternate;
+ SCOPED_MUTEX_LOCK(lock, od->mutex, this_ethread());
+
+#if 0
+ alternate_index = get_alternate_index(write_vector, earliest_key);
+ if (alternate_index < 0)
+ alternate_index = write_vector->insert(&alternate, alternate_index);
+
+ alt = write_vector->get(alternate_index);
+#endif
+
+ if (fragment != 0 && !alt->m_alt->m_earliest.m_flag.cached_p) {
+ SET_HANDLER(&CacheVC::openWriteEmptyEarliestDone);
+ if (!od->is_write_active(earliest_key, 0)) {
+ write_len = 0;
+ key = earliest_key;
+ Debug("amc", "[CacheVC::openWriteMain] writing empty earliest");
+ } else {
+ // go on the wait list
+ od->wait_for(earliest_key, this, 0);
+ not_writing = true;
+ }
+ } else if (od->is_write_active(earliest_key, write_pos)) {
+ od->wait_for(earliest_key, this, write_pos);
+ not_writing = true;
+ } else if (alternate.is_frag_cached(fragment)) {
+ not_writing = true;
+ Debug("amc", "Fragment %d already cached", fragment);
+ // Consume the data, as we won't be using it.
+ resp_range.consume(write_len);
+ blocks = iobufferblock_skip(blocks, &offset, &length, write_len);
+ // need to kick start things again or we'll stall.
+ return this->handleEvent(EVENT_IMMEDIATE);
+ } else {
+ od->write_active(earliest_key, this, write_pos);
+ }
+ }
+
+ if (0 == write_len) // need to set up the write not under OpenDir lock.
+ return do_write_lock_call();
+
if (towrite == ntodo && f.close_complete) {
closed = 1;
SET_HANDLER(&CacheVC::openWriteClose);
return openWriteClose(EVENT_NONE, NULL);
+ } else if (not_writing) {
+ return EVENT_CONT;
}
+
SET_HANDLER(&CacheVC::openWriteWriteDone);
+ Debug("amc", "[CacheVC::openWriteMain] doing write call");
return do_write_lock_call();
}
@@ -1434,7 +1599,7 @@ Lcollision : {
}
}
Ldone:
- SET_HANDLER(&CacheVC::openWriteMain);
+ SET_HANDLER(&CacheVC::openWriteInit);
return callcont(CACHE_EVENT_OPEN_WRITE);
Lcallreturn:
return handleEvent(AIO_EVENT_DONE, 0); // hopefully a tail call
@@ -1458,7 +1623,7 @@ CacheVC::openWriteStartDone(int event, Event *e)
if (!lock.is_locked())
VC_LOCK_RETRY_EVENT();
- if (_action.cancelled && (!od || !od->has_multiple_writers()))
+ if (_action.cancelled && (!od /* || !od->has_multiple_writers() */))
goto Lcancel;
if (event == AIO_EVENT_DONE) { // vector read done
@@ -1501,15 +1666,17 @@ CacheVC::openWriteStartDone(int event, Event *e)
}
Lcollision:
- int if_writers = ((uintptr_t)info == CACHE_ALLOW_MULTIPLE_WRITES);
+ // int if_writers = ((uintptr_t)info == CACHE_ALLOW_MULTIPLE_WRITES);
if (!od) {
- if ((err = vol->open_write(this, if_writers, cache_config_http_max_alts > 1 ? cache_config_http_max_alts : 0)) > 0)
+ if ((err = vol->open_write(this)) > 0)
goto Lfailure;
- if (od->has_multiple_writers()) {
- MUTEX_RELEASE(lock);
- SET_HANDLER(&CacheVC::openWriteMain);
- return callcont(CACHE_EVENT_OPEN_WRITE);
- }
+ /*
+ if (od->has_multiple_writers()) {
+ MUTEX_RELEASE(lock);
+ SET_HANDLER(&CacheVC::openWriteInit);
+ return this->openWriteInit(EVENT_IMMEDIATE, 0);
+ }
+ */
}
// check for collision
if (dir_probe(&first_key, vol, &dir, &last_collision)) {
@@ -1528,8 +1695,9 @@ Lsuccess:
od->reading_vec = 0;
if (_action.cancelled)
goto Lcancel;
- SET_HANDLER(&CacheVC::openWriteMain);
+ SET_HANDLER(&CacheVC::openWriteInit);
return callcont(CACHE_EVENT_OPEN_WRITE);
+// return this->openWriteInit(EVENT_IMMEDIATE, 0);
Lfailure:
CACHE_INCREMENT_DYN_STAT(base_stat + CACHE_STAT_FAILURE);
@@ -1553,7 +1721,7 @@ CacheVC::openWriteStartBegin(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED
cancel_trigger();
if (_action.cancelled)
return free_CacheVC(this);
- if (((err = vol->open_write_lock(this, false, 1)) > 0)) {
+ if (((err = vol->open_write_lock(this)) > 0)) {
CACHE_INCREMENT_DYN_STAT(base_stat + CACHE_STAT_FAILURE);
free_CacheVC(this);
_action.continuation->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *)-err);
@@ -1566,7 +1734,7 @@ CacheVC::openWriteStartBegin(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED
return openWriteOverwrite(EVENT_IMMEDIATE, 0);
} else {
// write by key
- SET_HANDLER(&CacheVC::openWriteMain);
+ SET_HANDLER(&CacheVC::openWriteInit);
return callcont(CACHE_EVENT_OPEN_WRITE);
}
}
@@ -1613,7 +1781,7 @@ Cache::open_write(Continuation *cont, const CacheKey *key, CacheFragType frag_ty
c->f.sync = (options & CACHE_WRITE_OPT_SYNC) == CACHE_WRITE_OPT_SYNC;
c->pin_in_cache = (uint32_t)apin_in_cache;
- if ((res = c->vol->open_write_lock(c, false, 1)) > 0) {
+ if ((res = c->vol->open_write_lock(c)) > 0) {
// document currently being written, abort
CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_FAILURE);
cont->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *)-res);
@@ -1626,9 +1794,10 @@ Cache::open_write(Continuation *cont, const CacheKey *key, CacheFragType frag_ty
return &c->_action;
}
if (!c->f.overwrite) {
- SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteMain);
+ SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteInit);
c->callcont(CACHE_EVENT_OPEN_WRITE);
return ACTION_RESULT_DONE;
+ // return c->do_write_init();
} else {
SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteOverwrite);
if (c->openWriteOverwrite(EVENT_IMMEDIATE, 0) == EVENT_DONE)
@@ -1638,6 +1807,22 @@ Cache::open_write(Continuation *cont, const CacheKey *key, CacheFragType frag_ty
}
}
+int
+CacheVC::updateWriteStateFromRange()
+{
+ if (resp_range.hasPendingRangeShift())
+ resp_range.consumeRangeShift();
+ write_pos = resp_range.getOffset();
+ fragment = alternate.get_frag_index_of(write_pos);
+ key = alternate.get_frag_key(fragment);
+ {
+ char tmp[64];
+ Debug("amc", "[writeMain] pos=%" PRId64 " frag=%d/%" PRId64 " key=%s", write_pos, fragment, alternate.get_frag_offset(fragment),
+ key.toHexStr(tmp));
+ }
+ return write_pos;
+}
+
#ifdef HTTP_CACHE
// main entry point for writing of http documents
Action *
@@ -1651,22 +1836,28 @@ Cache::open_write(Continuation *cont, const CacheKey *key, CacheHTTPInfo *info,
ink_assert(caches[type] == this);
intptr_t err = 0;
- int if_writers = (uintptr_t)info == CACHE_ALLOW_MULTIPLE_WRITES;
+ // int if_writers = (uintptr_t)info == CACHE_ALLOW_MULTIPLE_WRITES;
CacheVC *c = new_CacheVC(cont);
ProxyMutex *mutex = cont->mutex;
c->vio.op = VIO::WRITE;
c->first_key = *key;
- /*
- The transition from single fragment document to a multi-fragment document
- would cause a problem if the key and the first_key collide. In case of
- a collision, old vector data could be served to HTTP. Need to avoid that.
- Also, when evacuating a fragment, we have to decide if its the first_key
- or the earliest_key based on the dir_tag.
- */
- do {
- rand_CacheKey(&c->key, cont->mutex);
- } while (DIR_MASK_TAG(c->key.slice32(2)) == DIR_MASK_TAG(c->first_key.slice32(2)));
- c->earliest_key = c->key;
+ if (info) {
+ info->object_key_get(&c->key);
+ c->earliest_key = c->key;
+ } else {
+ /*
+ The transition from single fragment document to a multi-fragment document
+ would cause a problem if the key and the first_key collide. In case of
+ a collision, old vector data could be served to HTTP. Need to avoid that.
+ Also, when evacuating a fragment, we have to decide if its the first_key
+ or the earliest_key based on the dir_tag.
+ */
+ do {
+ rand_CacheKey(&c->key, cont->mutex);
+ } while (DIR_MASK_TAG(c->key.slice32(2)) == DIR_MASK_TAG(c->first_key.slice32(2)));
+ c->earliest_key = c->key;
+ }
+
c->frag_type = CACHE_FRAG_TYPE_HTTP;
c->vol = key_to_vol(key, hostname, host_len);
Vol *vol = c->vol;
@@ -1715,13 +1906,15 @@ Cache::open_write(Continuation *cont, const CacheKey *key, CacheHTTPInfo *info,
{
CACHE_TRY_LOCK(lock, c->vol->mutex, cont->mutex->thread_holding);
if (lock.is_locked()) {
- if ((err = c->vol->open_write(c, if_writers, cache_config_http_max_alts > 1 ? cache_config_http_max_alts : 0)) > 0)
+ if ((err = c->vol->open_write(c)) > 0)
goto Lfailure;
// If there are multiple writers, then this one cannot be an update.
// Only the first writer can do an update. If that's the case, we can
// return success to the state machine now.;
- if (c->od->has_multiple_writers())
- goto Lmiss;
+ /*
+ if (c->od->has_multiple_writers())
+ goto Lmiss;
+ */
if (!dir_probe(key, c->vol, &c->dir, &c->last_collision)) {
if (c->f.update) {
// fail update because vector has been GC'd
@@ -1730,9 +1923,12 @@ Cache::open_write(Continuation *cont, const CacheKey *key, CacheHTTPInfo *info,
goto Lfailure;
}
// document doesn't exist, begin write
+ ink_assert(NULL == c->od->open_writer);
+ c->od->open_writer = c;
goto Lmiss;
} else {
c->od->reading_vec = 1;
+ c->od->open_writer = c;
// document exists, read vector
SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteStartDone);
switch (c->do_read_call(&c->first_key)) {
@@ -1752,7 +1948,8 @@ Cache::open_write(Continuation *cont, const CacheKey *key, CacheHTTPInfo *info,
}
Lmiss:
- SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteMain);
+ // return c->do_write_init();
+ SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteInit);
c->callcont(CACHE_EVENT_OPEN_WRITE);
return ACTION_RESULT_DONE;
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/1c06db83/iocore/cache/I_Cache.h
----------------------------------------------------------------------
diff --git a/iocore/cache/I_Cache.h b/iocore/cache/I_Cache.h
index 77aafe9..e42ddf5 100644
--- a/iocore/cache/I_Cache.h
+++ b/iocore/cache/I_Cache.h
@@ -49,6 +49,7 @@
#define CACHE_COMPRESSION_LIBZ 2
#define CACHE_COMPRESSION_LIBLZMA 3
+struct CacheVConnection;
struct CacheVC;
struct CacheDisk;
#ifdef HTTP_CACHE
@@ -56,6 +57,7 @@ class CacheLookupHttpConfig;
class URL;
class HTTPHdr;
class HTTPInfo;
+class HTTPRangeSpec;
typedef HTTPHdr CacheHTTPHdr;
typedef URL CacheURL;
@@ -80,6 +82,17 @@ struct CacheProcessor : public Processor {
CacheFragType frag_type = CACHE_FRAG_TYPE_NONE, const char *hostname = 0, int host_len = 0);
inkcoreapi Action *open_read(Continuation *cont, const CacheKey *key, bool cluster_cache_local,
CacheFragType frag_type = CACHE_FRAG_TYPE_NONE, const char *hostname = 0, int host_len = 0);
+
+ /** Open a cache reader from an already open writer.
+
+ This is used for partial content on a cache miss to open a reader corresponding to the
+ partial content writer.
+ */
+ inkcoreapi Action *open_read(Continuation *cont, CacheVConnection *writer, HTTPHdr *client_request_hdr);
+
+ Action *open_read_buffer(Continuation *cont, MIOBuffer *buf, CacheKey *key, CacheFragType frag_type = CACHE_FRAG_TYPE_NONE,
+ char *hostname = 0, int host_len = 0);
+
inkcoreapi Action *open_write(Continuation *cont, CacheKey *key, bool cluster_cache_local,
CacheFragType frag_type = CACHE_FRAG_TYPE_NONE, int expected_size = CACHE_EXPECTED_SIZE,
int options = 0, time_t pin_in_cache = (time_t)0, char *hostname = 0, int host_len = 0);
@@ -124,6 +137,9 @@ struct CacheProcessor : public Processor {
*/
bool has_online_storage() const;
+ /** Get the target fragment size. */
+ int64_t get_fixed_fragment_size() const;
+
static int IsCacheEnabled();
static bool IsCacheReady(CacheFragType type);
@@ -189,6 +205,62 @@ struct CacheVConnection : public VConnection {
#ifdef HTTP_CACHE
virtual void set_http_info(CacheHTTPInfo *info) = 0;
virtual void get_http_info(CacheHTTPInfo **info) = 0;
+
+ /** Get the boundary string for a multi-part range response.
+ The length of the string is returned in @a len.
+
+ @return A point to the string.
+ */
+ virtual char const *get_http_range_boundary_string(int *len) const = 0;
+
+ /** Get the effective content size.
+
+ This is the amount of actual data based on any range or framing. Effectively this is the
+ value to be passed to the @c VIO while the content length is used in the HTTP header.
+ */
+ virtual int64_t get_effective_content_size() = 0;
+
+ /** Set the origin reported content size.
+
+ This is the content length reported by the origin server and should be considered a hint, not
+ definitive. The object size, as stored in the cache, is the actual amount of data received and
+ cached.
+
+ @note This is the total content length as reported in the HTTP header, not the partial (range based) response size.
+ Also this is the length of the HTTP content, which may differ from the size of the data stream.
+ */
+ virtual void set_full_content_length(int64_t) = 0;
+
+ /** Set the output ranges for the content.
+ */
+ virtual void set_content_range(HTTPRangeSpec const &range) = 0;
+
+ /// Get the unchanged ranges for the request range @a req.
+ /// If @a req is empty it is treated as a full request (non-partial).
+ /// @return @c true if the @a result is not empty.
+ /// @internal Currently this just returns the single range that is convex hull of the uncached request.
+ /// Someday we may want to do the exact range spec but we use the type for now because it's easier.
+ virtual bool
+ get_uncached(HTTPRangeSpec const &req, HTTPRangeSpec &result, int64_t initial)
+ {
+ (void)req;
+ (void)result;
+ (void)initial;
+ return false;
+ }
+
+ /** Set the range for the input (response content).
+ The incoming bytes will be written to this section of the object.
+ @note This range @b must be absolute.
+ @note The range is inclusive.
+ @return The # of bytes in the range.
+ */
+ virtual int64_t
+ set_inbound_range(int64_t min, int64_t max)
+ {
+ return 1 + (max - min);
+ }
+
#endif
virtual bool is_ram_cache_hit() const = 0;
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/1c06db83/iocore/cache/I_CacheDefs.h
----------------------------------------------------------------------
diff --git a/iocore/cache/I_CacheDefs.h b/iocore/cache/I_CacheDefs.h
index 941ff0e..02ce264 100644
--- a/iocore/cache/I_CacheDefs.h
+++ b/iocore/cache/I_CacheDefs.h
@@ -21,6 +21,7 @@
limitations under the License.
*/
+#include <vector>
#ifndef _I_CACHE_DEFS_H__
#define _I_CACHE_DEFS_H__
@@ -32,7 +33,7 @@
#define CACHE_ALT_INDEX_DEFAULT -1
#define CACHE_ALT_REMOVED -2
-#define CACHE_DB_MAJOR_VERSION 24
+#define CACHE_DB_MAJOR_VERSION 25
#define CACHE_DB_MINOR_VERSION 0
#define CACHE_DIR_MAJOR_VERSION 18
@@ -144,4 +145,5 @@ struct HttpCacheKey {
word(2) - tag (lower bits), hosttable hash (upper bits)
word(3) - ram cache hash, lookaside cache
*/
+
#endif // __CACHE_DEFS_H__
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/1c06db83/iocore/cache/P_CacheBC.h
----------------------------------------------------------------------
diff --git a/iocore/cache/P_CacheBC.h b/iocore/cache/P_CacheBC.h
index 2164692..2ffd4e8 100644
--- a/iocore/cache/P_CacheBC.h
+++ b/iocore/cache/P_CacheBC.h
@@ -33,9 +33,9 @@ namespace cache_bc
*/
typedef HTTPHdr HTTPHdr_v21;
+typedef HTTPHdr HTTPHdr_v23;
typedef HdrHeap HdrHeap_v23;
typedef CryptoHash CryptoHash_v23;
-typedef HTTPCacheAlt HTTPCacheAlt_v23;
/** Cache backwards compatibility structure - the fragment table.
This is copied from @c HTTPCacheAlt in @c HTTP.h.
@@ -120,6 +120,34 @@ struct Doc_v23 {
size_t data_len();
};
+struct HTTPCacheAlt_v23 {
+ uint32_t m_magic;
+ int32_t m_writeable;
+ int32_t m_unmarshal_len;
+
+ int32_t m_id;
+ int32_t m_rid;
+
+ int32_t m_object_key[4];
+ int32_t m_object_size[2];
+
+ HTTPHdr_v23 m_request_hdr;
+ HTTPHdr_v23 m_response_hdr;
+
+ time_t m_request_sent_time;
+ time_t m_response_received_time;
+
+ int m_frag_offset_count;
+ typedef uint64_t FragOffset;
+ FragOffset *m_frag_offsets;
+ static int const N_INTEGRAL_FRAG_OFFSETS = 4;
+ FragOffset m_integral_frag_offsets[N_INTEGRAL_FRAG_OFFSETS];
+
+ RefCountObj *m_ext_buffer;
+};
+
+typedef HTTPCacheAlt_v23 HTTPCacheAlt_v24; // no changes between these versions.
+
static size_t const sizeofDoc_v23 = sizeof(Doc_v23);
char *
Doc_v23::data()
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/1c06db83/iocore/cache/P_CacheDir.h
----------------------------------------------------------------------
diff --git a/iocore/cache/P_CacheDir.h b/iocore/cache/P_CacheDir.h
index c128537..36cf51d 100644
--- a/iocore/cache/P_CacheDir.h
+++ b/iocore/cache/P_CacheDir.h
@@ -213,48 +213,79 @@ struct FreeDir {
#define dir_prev(_e) (_e)->w[2]
#define dir_set_prev(_e, _o) (_e)->w[2] = (uint16_t)(_o)
-// INKqa11166 - Cache can not store 2 HTTP alternates simultaneously.
-// To allow this, move the vector from the CacheVC to the OpenDirEntry.
-// Each CacheVC now maintains a pointer to this vector. Adding/Deleting
-// alternates from this vector is done under the Vol::lock. The alternate
-// is deleted/inserted into the vector just before writing the vector disk
-// (CacheVC::updateVector).
-LINK_FORWARD_DECLARATION(CacheVC, opendir_link) // forward declaration
struct OpenDirEntry {
- DLL<CacheVC, Link_CacheVC_opendir_link> writers; // list of all the current writers
- DLL<CacheVC, Link_CacheVC_opendir_link> readers; // list of all the current readers - not used
- CacheHTTPInfoVector vector; // Vector for the http document. Each writer
- // maintains a pointer to this vector and
- // writes it down to disk.
- CacheKey single_doc_key; // Key for the resident alternate.
- Dir single_doc_dir; // Directory for the resident alternate
- Dir first_dir; // Dir for the vector. If empty, a new dir is
- // inserted, otherwise this dir is overwritten
- uint16_t num_writers; // num of current writers
- uint16_t max_writers; // max number of simultaneous writers allowed
- bool dont_update_directory; // if set, the first_dir is not updated.
- bool move_resident_alt; // if set, single_doc_dir is inserted.
- volatile bool reading_vec; // somebody is currently reading the vector
- volatile bool writing_vec; // somebody is currently writing the vector
+ typedef OpenDirEntry self; ///< Self reference type.
+
+ Ptr<ProxyMutex> mutex;
+
+ /// Vector for the http document. Each writer maintains a pointer to this vector and writes it down to disk.
+ CacheHTTPInfoVector vector;
+ CacheKey first_key; ///< Key for first doc for this object.
+ CacheKey single_doc_key; // Key for the resident alternate.
+ Dir single_doc_dir; // Directory for the resident alternate
+ Dir first_dir; // Dir for the vector. If empty, a new dir is
+ // inserted, otherwise this dir is overwritten
+ uint16_t num_active; // num of VCs working with this entry
+ uint16_t max_writers; // max number of simultaneous writers allowed
+ bool dont_update_directory; // if set, the first_dir is not updated.
+ bool move_resident_alt; // if set, single_doc_dir is inserted.
+ volatile bool reading_vec; // somebody is currently reading the vector
+ volatile bool writing_vec; // somebody is currently writing the vector
+
+ /** Set to a write @c CacheVC that has started but not yet updated the vector.
+
+ If this is set then there is a write @c CacheVC that is active but has not yet been able to
+ update the vector for its alternate. Any new reader should block on open if this is set and
+ enter itself on the @a _waiting list, making this effectively a write lock on the object.
+ This is necessary because we can't reliably do alternate selection in this state. The waiting
+ read @c CacheVC instances are released as soon as the vector is updated, they do not have to
+ wait until the write @c CacheVC has finished its transaction. In practice this means until the
+ server response has been received and processed.
+ */
+ volatile CacheVC *open_writer;
+ /** A list of @c CacheVC instances that are waiting for the @a open_writer.
+ */
+ DLL<CacheVC, Link_CacheVC_Active_Link> open_waiting;
LINK(OpenDirEntry, link);
- int wait(CacheVC *c, int msec);
-
- bool
- has_multiple_writers()
- {
- return num_writers > 1;
- }
+ // int wait(CacheVC *c, int msec);
+
+ /// Get the alternate index for the @a key.
+ int index_of(CacheKey const &key);
+ /// Check if there are any writers for the alternate of @a alt_key.
+ bool has_writer(CacheKey const &alt_key);
+ /// Mark a @c CacheVC as actively writing at @a offset on the alternate with @a alt_key.
+ self &write_active(CacheKey const &alt_key, CacheVC *vc, int64_t offset);
+ /// Mark an active write by @a vc as complete and indicate whether it had @a success.
+ /// If the write is not @a success then the fragment is not marked as cached.
+ self &write_complete(CacheKey const &alt_key, CacheVC *vc, bool success = true);
+ /// Indicate if a VC is currently writing to the fragment with this @a offset.
+ bool is_write_active(CacheKey const &alt_key, int64_t offset);
+ /// Get the fragment key for a specific @a offset.
+ CacheKey const &key_for(CacheKey const &alt_key, int64_t offset);
+ /** Wait for a fragment to be written.
+
+ @return @c false if there is no writer that is scheduled to write that fragment.
+ */
+ bool wait_for(CacheKey const &alt_key, CacheVC *vc, int64_t offset);
+ /// Close out anything related to this writer
+ self &close_writer(CacheKey const &alt_key, CacheVC *vc);
};
struct OpenDir : public Continuation {
- Queue<CacheVC, Link_CacheVC_opendir_link> delayed_readers;
+ typedef Queue<CacheVC, Link_CacheVC_OpenDir_Link> CacheVCQ;
+ CacheVCQ delayed_readers;
+
DLL<OpenDirEntry> bucket[OPEN_DIR_BUCKETS];
- int open_write(CacheVC *c, int allow_if_writers, int max_writers);
- int close_write(CacheVC *c);
- OpenDirEntry *open_read(const CryptoHash *key);
+ /** Open a live directory entry for @a vc.
+
+ @a force_p is set to @c true to force the entry if it's not already there.
+ */
+ OpenDirEntry *open_entry(Vol *vol, CryptoHash const &key, bool force_p = false);
+ void close_entry(CacheVC *c);
+ // OpenDirEntry *open_read(CryptoHash *key);
int signal_readers(int event, Event *e);
OpenDir();