You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by am...@apache.org on 2015/04/19 19:48:01 UTC
[3/3] trafficserver git commit: TS-974: Base implmentation.
TS-974: Base implmentation.
Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo
Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/799a83bb
Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/799a83bb
Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/799a83bb
Branch: refs/heads/ts-974-5-3-x
Commit: 799a83bb4bdf8a34b5716289e6b9e2edc60772cf
Parents: 98a2e1b
Author: Alan M. Carroll <so...@yahoo-inc.com>
Authored: Wed Mar 25 20:13:14 2015 -0500
Committer: Alan M. Carroll <am...@apache.org>
Committed: Sun Apr 19 12:46:57 2015 -0500
----------------------------------------------------------------------
doc/arch/cache/cache-data-structures.en.rst | 3 +-
iocore/cache/Cache.cc | 75 ++-
iocore/cache/CacheDir.cc | 92 ++-
iocore/cache/CacheHttp.cc | 310 +++++++--
iocore/cache/CacheRead.cc | 559 ++++++++++------
iocore/cache/CacheWrite.cc | 331 +++++++---
iocore/cache/I_Cache.h | 90 ++-
iocore/cache/I_CacheDefs.h | 4 +-
iocore/cache/P_CacheBC.h | 30 +-
iocore/cache/P_CacheDir.h | 79 ++-
iocore/cache/P_CacheHttp.h | 277 +++++++-
iocore/cache/P_CacheInternal.h | 117 +++-
iocore/cluster/ClusterCache.cc | 2 +-
iocore/cluster/ClusterVConnection.cc | 1 +
iocore/cluster/P_Cluster.h | 3 +-
iocore/cluster/P_ClusterCache.h | 12 +
lib/ts/CryptoHash.h | 47 +-
lib/ts/InkErrno.h | 38 +-
lib/ts/ParseRules.cc | 17 +
lib/ts/ParseRules.h | 20 +
lib/ts/TsBuffer.h | 97 ++-
lib/ts/ink_code.cc | 74 +++
proxy/hdrs/HTTP.cc | 757 ++++++++++++++++++----
proxy/hdrs/HTTP.h | 786 ++++++++++++++++++++---
proxy/http/HttpCacheSM.cc | 25 +
proxy/http/HttpCacheSM.h | 9 +-
proxy/http/HttpDebugNames.cc | 2 +
proxy/http/HttpSM.cc | 132 +++-
proxy/http/HttpSM.h | 8 +-
proxy/http/HttpTransact.cc | 204 ++++--
proxy/http/HttpTransact.h | 94 ++-
proxy/http/HttpTransactHeaders.cc | 14 +
proxy/http/HttpTransactHeaders.h | 1 +
33 files changed, 3512 insertions(+), 798 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/799a83bb/doc/arch/cache/cache-data-structures.en.rst
----------------------------------------------------------------------
diff --git a/doc/arch/cache/cache-data-structures.en.rst b/doc/arch/cache/cache-data-structures.en.rst
index 1158051..b172999 100644
--- a/doc/arch/cache/cache-data-structures.en.rst
+++ b/doc/arch/cache/cache-data-structures.en.rst
@@ -24,7 +24,8 @@ Cache Data Structures
.. cpp:class:: OpenDir
- An open directory entry. It contains all the information of a
+ This represents an open directory entry. An entry is open when there is an active write on the object. Read operations do not of themselves require an `OpenDir` but if there is already one for the object it will be used by the read operation to coordinate with the write operations.
+
:cpp:class:`Dir` plus additional information from the first :cpp:class:`Doc`.
.. cpp:class:: CacheVC
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/799a83bb/iocore/cache/Cache.cc
----------------------------------------------------------------------
diff --git a/iocore/cache/Cache.cc b/iocore/cache/Cache.cc
index d8e7bed..78f8d14 100644
--- a/iocore/cache/Cache.cc
+++ b/iocore/cache/Cache.cc
@@ -301,7 +301,7 @@ CacheVC::CacheVC() : alternate_index(CACHE_ALT_INDEX_DEFAULT)
}
#ifdef HTTP_CACHE
-HTTPInfo::FragOffset *
+HTTPInfo::FragmentDescriptorTable*
CacheVC::get_frag_table()
{
ink_assert(alternate.valid());
@@ -471,12 +471,30 @@ CacheVC::set_http_info(CacheHTTPInfo *ainfo)
} else
f.allow_empty_doc = 0;
alternate.copy_shallow(ainfo);
+ // This is not a good place to do this but I can't figure out a better one. We must do it
+ // no earlier than this, because there's no actual alternate to store the value in before this
+ // and I don't know of any later point that's guaranteed to be called before this is needed.
+ alternate.m_alt->m_fixed_fragment_size = cache_config_target_fragment_size - sizeofDoc;
ainfo->clear();
}
#endif
-bool
-CacheVC::set_pin_in_cache(time_t time_pin)
+int64_t
+CacheVC::set_inbound_range(int64_t min, int64_t max)
+{
+ resp_range.clear();
+ resp_range.getRangeSpec().add(min,max);
+ return 1 + (max - min);
+}
+
+void
+CacheVC::set_full_content_length(int64_t cl)
+{
+ alternate.object_size_set(cl);
+ resp_range.apply(cl);
+}
+
+bool CacheVC::set_pin_in_cache(time_t time_pin)
{
if (total_len) {
ink_assert(!"should Pin the document before writing");
@@ -491,7 +509,19 @@ CacheVC::set_pin_in_cache(time_t time_pin)
}
bool
-CacheVC::set_disk_io_priority(int priority)
+CacheVC::get_uncached(HTTPRangeSpec& result)
+{
+ HTTPRangeSpec::Range r = od ? write_vector->get_uncached_hull(earliest_key, resp_range.getRangeSpec())
+ : alternate.get_uncached_hull(resp_range.getRangeSpec())
+ ;
+ if (r.isValid()) {
+ result.add(r);
+ return true;
+ }
+ return false;
+}
+
+bool CacheVC::set_disk_io_priority(int priority)
{
ink_assert(priority >= AIO_LOWEST_PRIORITY);
io.aiocb.aio_reqprio = priority;
@@ -2359,6 +2389,19 @@ CacheVC::is_pread_capable()
return !f.read_from_writer_called;
}
+# if 0
+void
+CacheVC::get_missing_ranges(HTTPRangeSpec& missing)
+{
+ missing.reset();
+ if (0 == alternate.);
+ // For now we'll just compute the convex hull of the missing data.
+ for ( RangeBox::const_iterator spot = req.begin(), limit = req.end() ; spot != limit ; ++spot ) {
+
+ }
+}
+#endif
+
#define STORE_COLLISION 1
#ifdef HTTP_CACHE
@@ -2384,7 +2427,7 @@ unmarshal_helper(Doc *doc, Ptr<IOBufferData> &buf, int &okay)
@internal I looked at doing this in place (rather than a copy & modify) but
- The in place logic would be even worse than this mess
- It wouldn't save you that much, since you end up doing inserts early in the buffer.
- Without extreme care in the logic it could end up doing more copying thatn
+ Without extreme care in the logic it could end up doing more copying than
the simpler copy & modify.
@internal This logic presumes the existence of some slack at the end of the buffer, which
@@ -2403,16 +2446,16 @@ upgrade_doc_version(Ptr<IOBufferData> &buf)
if (0 == doc->hlen) {
Debug("cache_bc", "Doc %p without header, no upgrade needed.", doc);
} else if (CACHE_FRAG_TYPE_HTTP_V23 == doc->doc_type) {
- cache_bc::HTTPCacheAlt_v21 *alt = reinterpret_cast<cache_bc::HTTPCacheAlt_v21 *>(doc->hdr());
+ typedef cache_bc::HTTPCacheFragmentTable::FragOffset FragOffset;
+ cache_bc::HTTPCacheAlt_v21* alt = reinterpret_cast<cache_bc::HTTPCacheAlt_v21*>(doc->hdr());
if (alt && alt->is_unmarshalled_format()) {
Ptr<IOBufferData> d_buf(ioDataAllocator.alloc());
- Doc *d_doc;
- char *src;
- char *dst;
- char *hdr_limit = doc->data();
- HTTPInfo::FragOffset *frags =
- reinterpret_cast<HTTPInfo::FragOffset *>(static_cast<char *>(buf->data()) + cache_bc::sizeofDoc_v23);
- int frag_count = doc->_flen / sizeof(HTTPInfo::FragOffset);
+ Doc* d_doc;
+ char* src;
+ char* dst;
+ char* hdr_limit = doc->data();
+ FragOffset* frags = reinterpret_cast<FragOffset*>(static_cast<char*>(buf->data()) + cache_bc::sizeofDoc_v23);
+ int frag_count = doc->_flen / sizeof(FragOffset);
size_t n = 0;
size_t content_size = doc->data_len();
@@ -2753,6 +2796,11 @@ LinterimRead:
io.action = this;
io.thread = mutex->thread_holding->tt == DEDICATED ? AIO_CALLBACK_THREAD_ANY : mutex->thread_holding;
SET_HANDLER(&CacheVC::handleReadDone);
+ {
+ char xt[33];
+ Debug("amc", "cache read : key = %s %" PRId64 " bytes at stripe offset =% " PRId64
+ , key.toHexStr(xt), io.aiocb.aio_nbytes, io.aiocb.aio_offset);
+ }
ink_assert(ink_aio_read(&io) >= 0);
CACHE_DEBUG_INCREMENT_DYN_STAT(cache_pread_count_stat);
return EVENT_CONT;
@@ -3598,7 +3646,6 @@ CacheProcessor::open_read(Continuation *cont, URL *url, bool cluster_cache_local
return caches[type]->open_read(cont, url, request, params, type);
}
-
//----------------------------------------------------------------------------
Action *
CacheProcessor::open_write(Continuation *cont, int expected_size, URL *url, bool cluster_cache_local, CacheHTTPHdr *request,
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/799a83bb/iocore/cache/CacheDir.cc
----------------------------------------------------------------------
diff --git a/iocore/cache/CacheDir.cc b/iocore/cache/CacheDir.cc
index 779c16b..9bac5d8 100644
--- a/iocore/cache/CacheDir.cc
+++ b/iocore/cache/CacheDir.cc
@@ -69,28 +69,33 @@ OpenDir::OpenDir()
Returns 1 on success and 0 on failure.
*/
int
-OpenDir::open_write(CacheVC *cont, int allow_if_writers, int max_writers)
+OpenDir::open_write(CacheVC *cont, int /* allow_if_writers */, int /* max_writers */)
{
ink_assert(cont->vol->mutex->thread_holding == this_ethread());
unsigned int h = cont->first_key.slice32(0);
int b = h % OPEN_DIR_BUCKETS;
for (OpenDirEntry *d = bucket[b].head; d; d = d->link.next) {
- if (!(d->writers.head->first_key == cont->first_key))
+ if (!(d->first_key == cont->first_key))
continue;
- if (allow_if_writers && d->num_writers < d->max_writers) {
- d->writers.push(cont);
+// if (allow_if_writers && d->num_writers < d->max_writers) {
+// d->writers.push(cont);
+ // [amc] Need to think if we want to track writers per object and not just per alt.
+ // useful to know when to close out the OpenDirEntry.
d->num_writers++;
cont->od = d;
cont->write_vector = &d->vector;
return 1;
- }
+// }
return 0;
}
- OpenDirEntry *od = THREAD_ALLOC(openDirEntryAllocator, cont->mutex->thread_holding);
- od->readers.head = NULL;
- od->writers.push(cont);
+ OpenDirEntry *od = THREAD_ALLOC(openDirEntryAllocator,
+ cont->mutex->thread_holding);
+// od->readers.head = NULL;
+// od->writers.push(cont);
+ od->mutex = new_ProxyMutex();
+ od->first_key = cont->first_key;
od->num_writers = 1;
- od->max_writers = max_writers;
+// od->max_writers = max_writers;
od->vector.data.data = &od->vector.data.fast_data[0];
od->dont_update_directory = 0;
od->move_resident_alt = 0;
@@ -106,7 +111,7 @@ OpenDir::open_write(CacheVC *cont, int allow_if_writers, int max_writers)
int
OpenDir::signal_readers(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
{
- Queue<CacheVC, Link_CacheVC_opendir_link> newly_delayed_readers;
+ CacheVCQ newly_delayed_readers;
EThread *t = mutex->thread_holding;
CacheVC *c = NULL;
while ((c = delayed_readers.dequeue())) {
@@ -132,15 +137,15 @@ int
OpenDir::close_write(CacheVC *cont)
{
ink_assert(cont->vol->mutex->thread_holding == this_ethread());
- cont->od->writers.remove(cont);
- cont->od->num_writers--;
- if (!cont->od->writers.head) {
+// cont->od->writers.remove(cont);
+ if (--(cont->od->num_writers) < 1) {
unsigned int h = cont->first_key.slice32(0);
int b = h % OPEN_DIR_BUCKETS;
bucket[b].remove(cont->od);
- delayed_readers.append(cont->od->readers);
+// delayed_readers.append(cont->od->readers);
signal_readers(0, 0);
cont->od->vector.clear();
+ cont->od->mutex = 0;
THREAD_FREE(cont->od, openDirEntryAllocator, cont->mutex->thread_holding);
}
cont->od = NULL;
@@ -153,11 +158,12 @@ OpenDir::open_read(INK_MD5 *key)
unsigned int h = key->slice32(0);
int b = h % OPEN_DIR_BUCKETS;
for (OpenDirEntry *d = bucket[b].head; d; d = d->link.next)
- if (d->writers.head->first_key == *key)
+ if (d->first_key == *key)
return d;
return NULL;
}
+# if 0
int
OpenDirEntry::wait(CacheVC *cont, int msec)
{
@@ -168,6 +174,62 @@ OpenDirEntry::wait(CacheVC *cont, int msec)
readers.push(cont);
return EVENT_CONT;
}
+# endif
+
+int
+OpenDirEntry::index_of(CacheKey const& alt_key)
+{
+ return vector.index_of(alt_key);
+}
+
+bool
+OpenDirEntry::has_writer(CacheKey const& alt_key)
+{
+ return vector.has_writer(alt_key);
+}
+
+OpenDirEntry&
+OpenDirEntry::write_active(CacheKey const& alt_key, CacheVC* vc, int64_t offset)
+{
+ Debug("amc", "VC %p write active @ %" PRId64, vc, offset);
+ vector.write_active(alt_key, vc, offset);
+ return *this;
+}
+
+OpenDirEntry&
+OpenDirEntry::write_complete(CacheKey const& alt_key, CacheVC* vc, bool success)
+{
+ Debug("amc", "[OpenDir::write_complete] VC %p write %s", vc, (success ? "succeeded" : "failed"));
+ vector.write_complete(alt_key, vc, success);
+ return *this;
+}
+
+bool
+OpenDirEntry::is_write_active(CacheKey const& alt_key, int64_t offset)
+{
+ return vector.is_write_active(alt_key, offset);
+}
+
+CacheKey const&
+OpenDirEntry::key_for(CacheKey const& alt_key, int64_t offset)
+{
+ return vector.key_for(alt_key, offset);
+}
+
+OpenDirEntry&
+OpenDirEntry::waiting_for(CacheKey const& alt_key, CacheVC* vc, int64_t offset)
+{
+ Debug("amc", "vc %p waiting for %" PRId64, vc, offset);
+ vector.waiting_for(alt_key, vc, offset);
+ return *this;
+}
+
+OpenDirEntry&
+OpenDirEntry::close_writer(CacheKey const& alt_key, CacheVC* vc)
+{
+ vector.close_writer(alt_key, vc);
+ return *this;
+}
//
// Cache Directory
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/799a83bb/iocore/cache/CacheHttp.cc
----------------------------------------------------------------------
diff --git a/iocore/cache/CacheHttp.cc b/iocore/cache/CacheHttp.cc
index 3d7dfba..97837ee 100644
--- a/iocore/cache/CacheHttp.cc
+++ b/iocore/cache/CacheHttp.cc
@@ -29,10 +29,10 @@
/*-------------------------------------------------------------------------
-------------------------------------------------------------------------*/
-static vec_info default_vec_info;
+// Guaranteed to be all zero?
+static CacheHTTPInfoVector::Item default_vec_info;
#ifdef HTTP_CACHE
-static CacheHTTPInfo default_http_info;
CacheHTTPInfoVector::CacheHTTPInfoVector() : magic(NULL), data(&default_vec_info, 4), xcount(0)
{
@@ -46,7 +46,7 @@ CacheHTTPInfoVector::~CacheHTTPInfoVector()
int i;
for (i = 0; i < xcount; i++) {
- data[i].alternate.destroy();
+ data[i]._alternate.destroy();
}
vector_buf.clear();
magic = NULL;
@@ -61,7 +61,7 @@ CacheHTTPInfoVector::insert(CacheHTTPInfo *info, int index)
if (index == CACHE_ALT_INDEX_DEFAULT)
index = xcount++;
- data(index).alternate.copy_shallow(info);
+ data(index)._alternate.copy_shallow(info);
return index;
}
@@ -77,8 +77,8 @@ CacheHTTPInfoVector::detach(int idx, CacheHTTPInfo *r)
ink_assert(idx >= 0);
ink_assert(idx < xcount);
- r->copy_shallow(&data[idx].alternate);
- data[idx].alternate.destroy();
+ r->copy_shallow(&data[idx]._alternate);
+ data[idx]._alternate.destroy();
for (i = idx; i < (xcount - 1); i++) {
data[i] = data[i + i];
@@ -94,7 +94,7 @@ void
CacheHTTPInfoVector::remove(int idx, bool destroy)
{
if (destroy)
- data[idx].alternate.destroy();
+ data[idx]._alternate.destroy();
for (; idx < (xcount - 1); idx++)
data[idx] = data[idx + 1];
@@ -112,7 +112,7 @@ CacheHTTPInfoVector::clear(bool destroy)
if (destroy) {
for (i = 0; i < xcount; i++) {
- data[i].alternate.destroy();
+ data[i]._alternate.destroy();
}
}
xcount = 0;
@@ -134,14 +134,14 @@ CacheHTTPInfoVector::print(char *buffer, size_t buf_size, bool temps)
purl = 1;
for (i = 0; i < xcount; i++) {
- if (data[i].alternate.valid()) {
+ if (data[i]._alternate.valid()) {
if (purl) {
Arena arena;
char *url;
purl = 0;
URL u;
- data[i].alternate.request_url_get(&u);
+ data[i]._alternate.request_url_get(&u);
url = u.string_get(&arena);
if (url) {
snprintf(p, buf_size, "[%s] ", url);
@@ -151,8 +151,9 @@ CacheHTTPInfoVector::print(char *buffer, size_t buf_size, bool temps)
}
}
- if (temps || !(data[i].alternate.object_key_get() == zero_key)) {
- snprintf(p, buf_size, "[%d %s]", data[i].alternate.id_get(), CacheKey(data[i].alternate.object_key_get()).toHexStr(buf));
+ if (temps || !(data[i]._alternate.object_key_get() == zero_key)) {
+ snprintf(p, buf_size, "[%d %s]", data[i]._alternate.id_get(),
+ CacheKey(data[i]._alternate.object_key_get()).toHexStr(buf));
tmp = strlen(p);
p += tmp;
buf_size -= tmp;
@@ -170,7 +171,7 @@ CacheHTTPInfoVector::marshal_length()
int length = 0;
for (int i = 0; i < xcount; i++) {
- length += data[i].alternate.marshal_length();
+ length += data[i]._alternate.marshal_length();
}
return length;
@@ -187,7 +188,7 @@ CacheHTTPInfoVector::marshal(char *buf, int length)
ink_assert(!(((intptr_t)buf) & 3)); // buf must be aligned
for (int i = 0; i < xcount; i++) {
- int tmp = data[i].alternate.marshal(buf, length);
+ int tmp = data[i]._alternate.marshal(buf, length);
length -= tmp;
buf += tmp;
count++;
@@ -199,8 +200,10 @@ CacheHTTPInfoVector::marshal(char *buf, int length)
return buf - start;
}
-int
-CacheHTTPInfoVector::unmarshal(const char *buf, int length, RefCountObj *block_ptr)
+/*-------------------------------------------------------------------------
+ -------------------------------------------------------------------------*/
+uint32_t
+CacheHTTPInfoVector::get_handles(const char *buf, int length, RefCountObj * block_ptr)
{
ink_assert(!(((intptr_t)buf) & 3)); // buf must be aligned
@@ -208,51 +211,280 @@ CacheHTTPInfoVector::unmarshal(const char *buf, int length, RefCountObj *block_p
CacheHTTPInfo info;
xcount = 0;
- while (length - (buf - start) > (int)sizeof(HTTPCacheAlt)) {
- int tmp = HTTPInfo::unmarshal((char *)buf, length - (buf - start), block_ptr);
+ vector_buf = block_ptr;
+
+ while (length - (buf - start) > (int) sizeof(HTTPCacheAlt)) {
+
+ int tmp = info.get_handle((char *) buf, length - (buf - start));
if (tmp < 0) {
- return -1;
+ ink_assert(!"CacheHTTPInfoVector::unmarshal get_handle() failed");
+ return (uint32_t) -1;
}
- info.m_alt = (HTTPCacheAlt *)buf;
buf += tmp;
- data(xcount).alternate = info;
+ data(xcount)._alternate = info;
xcount++;
}
return ((caddr_t)buf - (caddr_t)start);
}
+/*-------------------------------------------------------------------------
+ -------------------------------------------------------------------------*/
+
+int
+CacheHTTPInfoVector::index_of(CacheKey const& alt_key)
+{
+ int zret;
+ for ( zret = 0 ; zret < xcount && alt_key != data[zret]._alternate.object_key_get() ; ++zret )
+ ;
+ return zret < xcount ? zret : -1;
+}
/*-------------------------------------------------------------------------
-------------------------------------------------------------------------*/
-uint32_t
-CacheHTTPInfoVector::get_handles(const char *buf, int length, RefCountObj *block_ptr)
+
+CacheKey const&
+CacheHTTPInfoVector::key_for(CacheKey const& alt_key, int64_t offset)
{
- ink_assert(!(((intptr_t)buf) & 3)); // buf must be aligned
+ int idx = this->index_of(alt_key);
+ Item& item = data[idx];
+ return item._alternate.get_frag_key_of(offset);
+}
- const char *start = buf;
- CacheHTTPInfo info;
- xcount = 0;
+/*-------------------------------------------------------------------------
+ -------------------------------------------------------------------------*/
- vector_buf = block_ptr;
+CacheHTTPInfoVector&
+CacheHTTPInfoVector::write_active(CacheKey const& alt_key, CacheVC* vc, int64_t offset)
+{
+ int idx = this->index_of(alt_key);
+ Item& item = data[idx];
- while (length - (buf - start) > (int)sizeof(HTTPCacheAlt)) {
- int tmp = info.get_handle((char *)buf, length - (buf - start));
- if (tmp < 0) {
- ink_assert(!"CacheHTTPInfoVector::unmarshal get_handle() failed");
- return (uint32_t)-1;
+ Debug("amc", "[CacheHTTPInfoVector::write_active] VC %p write %" PRId64, vc, offset);
+
+ vc->fragment = item._alternate.get_frag_index_of(offset);
+ item._active.push(vc);
+ return *this;
+}
+
+/*-------------------------------------------------------------------------
+ -------------------------------------------------------------------------*/
+
+CacheHTTPInfoVector&
+CacheHTTPInfoVector::write_complete(CacheKey const& alt_key, CacheVC* vc, bool success)
+{
+ int idx = this->index_of(alt_key);
+ Item& item = data[idx];
+ CacheVC* reader;
+
+ Debug("amc", "[CacheHTTPInfoVector::write_complete] VC %p write %s", vc, (success ? "succeeded" : "failed"));
+
+ item._active.remove(vc);
+ if (success) item._alternate.mark_frag_write(vc->fragment);
+
+ // Kick all the waiters, success or fail.
+ while (NULL != (reader = item._waiting.pop()))
+ eventProcessor.schedule_imm(reader);
+
+ return *this;
+}
+
+/*-------------------------------------------------------------------------
+ -------------------------------------------------------------------------*/
+
+bool
+CacheHTTPInfoVector::has_writer(CacheKey const& alt_key)
+{
+ int alt_idx = this->index_of(alt_key);
+ return alt_idx >= 0 && data[alt_idx]._writers.head != NULL;
+}
+
+bool
+CacheHTTPInfoVector::is_write_active(CacheKey const& alt_key, int64_t offset)
+{
+ int alt_idx = this->index_of(alt_key);
+ Item& item = data[alt_idx];
+ int frag_idx = item._alternate.get_frag_index_of(offset);
+ for ( CacheVC* vc = item._active.head ; vc ; vc = item._active.next(vc) ) {
+ if (vc->fragment == frag_idx) return true;
+ }
+ return false;
+}
+
+/*-------------------------------------------------------------------------
+ -------------------------------------------------------------------------*/
+
+CacheHTTPInfoVector&
+CacheHTTPInfoVector::waiting_for(CacheKey const& alt_key, CacheVC* vc, int64_t offset)
+{
+ int alt_idx = this->index_of(alt_key);
+ Item& item = data[alt_idx];
+ int frag_idx = item._alternate.get_frag_index_of(offset);
+ vc->fragment = frag_idx;
+ item._waiting.push(vc);
+ return *this;
+}
+
+/*-------------------------------------------------------------------------
+ -------------------------------------------------------------------------*/
+
+CacheHTTPInfoVector&
+CacheHTTPInfoVector::close_writer(CacheKey const& alt_key, CacheVC* vc)
+{
+ CacheVC* reader;
+ int alt_idx = this->index_of(alt_key);
+ Item& item = data[alt_idx];
+ item._writers.remove(vc);
+ while (NULL != (reader = item._waiting.pop())) {
+ Debug("amc", "[close_writer] wake up %p", reader);
+ reader->wake_up_thread->schedule_imm(reader);
+ }
+ return *this;
+}
+
+/*-------------------------------------------------------------------------
+ -------------------------------------------------------------------------*/
+
+HTTPRangeSpec::Range
+CacheHTTPInfoVector::get_uncached_hull(CacheKey const& alt_key, HTTPRangeSpec const& req)
+{
+ int alt_idx = this->index_of(alt_key);
+ Item& item = data[alt_idx];
+ Queue<CacheVC, Link_CacheVC_OpenDir_Link> writers;
+ CacheVC* vc;
+ CacheVC* cycle_vc = NULL;
+ // Yeah, this need to be tunable.
+ uint64_t DELTA = item._alternate.get_frag_fixed_size() * 16;
+ HTTPRangeSpec::Range r(item._alternate.get_uncached_hull(req));
+
+ if (r.isValid()) {
+ /* Now clip against the writers.
+ We move all the writers to a local list and move them back as we are done using them to clip.
+ This is so we don't skip a potentially valid writer because they are not in start order.
+ */
+ writers.append(item._writers);
+ item._writers.clear();
+ while (r._min < r._max && NULL != (vc = writers.pop())) {
+ uint64_t base = static_cast<int64_t>(writers.head->resp_range.getOffset());
+ uint64_t delta = static_cast<int64_t>(writers.head->resp_range.getRemnantSize());
+
+ if (base+delta < r._min || base > r._max) {
+ item._writers.push(vc); // of no use to us, just put it back.
+ } else if (base < r._min + DELTA) {
+ r._min = base + delta; // we can wait, so depend on this writer and clip.
+ item._writers.push(vc); // we're done with it, put it back.
+ cycle_vc = NULL; // we did something so clear cycle indicator
+ } else if (vc == cycle_vc) { // we're looping.
+ item._writers.push(vc); // put this one back.
+ while (NULL != (vc = writers.pop())) item._writers.push(vc); // and the rest.
+ } else {
+ writers.enqueue(vc); // put it back to later checking.
+ if (NULL == cycle_vc) cycle_vc = vc; // but keep an eye out for it coming around again.
+ }
}
- buf += tmp;
+ }
+ return r;
+}
- data(xcount).alternate = info;
- xcount++;
+/*-------------------------------------------------------------------------
+ -------------------------------------------------------------------------*/
+
+void
+CacheRange::clear()
+{
+ _offset = 0;
+ _idx = -1;
+ _pending_range_shift_p = false;
+ _ct_field = NULL; // need to do real cleanup at some point.
+ _r.clear();
+}
+
+bool
+CacheRange::init(HTTPHdr* req)
+{
+ bool zret = true;
+ MIMEField* rf = req->field_find(MIME_FIELD_RANGE, MIME_LEN_RANGE);
+ if (rf) {
+ int len;
+ char const* val = rf->value_get(&len);
+ zret = _r.parseRangeFieldValue(val, len);
}
+ return zret;
+}
- return ((caddr_t)buf - (caddr_t)start);
+bool
+CacheRange::start()
+{
+ bool zret = false;
+
+ if (_r.hasRanges()) {
+ _offset = _r[_idx = 0]._min;
+ _pending_range_shift_p = true;
+ zret = true;
+ }
+ return zret;
+}
+
+bool
+CacheRange::apply(uint64_t len)
+{
+ bool zret = _r.apply(len);
+ if (zret) {
+ _len = len;
+ if (_r.hasRanges()) {
+ _offset = _r[_idx = 0]._min;
+ if (_r.isMulti()) _pending_range_shift_p = true;
+ }
+ }
+ return zret;
+}
+
+uint64_t
+CacheRange::consume(uint64_t size)
+{
+ switch (_r._state) {
+ case HTTPRangeSpec::EMPTY: _offset += size; break;
+ case HTTPRangeSpec::SINGLE: _offset += std::min(size, (_r._single._max - _offset) + 1 ); break;
+ case HTTPRangeSpec::MULTI:
+ ink_assert(_idx < static_cast<int>(_r.count()));
+ // Must not consume more than 1 range or the boundary strings won't get sent.
+ ink_assert(!_pending_range_shift_p);
+ ink_assert(size <= (_r[_idx]._max - _offset) + 1);
+ _offset += size;
+ if (_offset > _r[_idx]._max && ++_idx < static_cast<int>(_r.count())) {
+ _offset = _r[_idx]._min;
+ _pending_range_shift_p = true;
+ }
+ break;
+ default: break;
+ }
+
+ return _offset;
}
-#else // HTTP_CACHE
+CacheRange&
+CacheRange::generateBoundaryStr(CacheKey const& key)
+{
+ uint64_t rnd = this_ethread()->generator.random();
+ snprintf(_boundary, sizeof(_boundary), "%016" PRIx64 "%016" PRIx64 "..%016" PRIx64, key.slice64(0), key.slice64(1), rnd);
+ // GAH! snprintf null terminates so we can't actually print the last nybble that way and all of
+ // the internal hex converters do the same thing. This is crazy code I need to fix at some point.
+ // It is critical to print every nybble or the content lengths won't add up.
+ _boundary[HTTP_RANGE_BOUNDARY_LEN-1] = "0123456789abcdef"[rnd & 0xf];
+ return *this;
+}
+
+uint64_t
+CacheRange::calcContentLength() const
+{
+ return _r.calcContentLength(_len, _ct_field ? _ct_field->m_len_value : 0);
+}
+
+/*-------------------------------------------------------------------------
+ -------------------------------------------------------------------------*/
+
+#else //HTTP_CACHE
CacheHTTPInfoVector::CacheHTTPInfoVector() : data(&default_vec_info, 4), xcount(0)
{
@@ -348,5 +580,7 @@ CacheHTTPInfoVector::get_handles(const char * /* buf ATS_UNUSED */, int /* lengt
ink_assert(0);
return 0;
}
+/*-------------------------------------------------------------------------
+ -------------------------------------------------------------------------*/
#endif // HTTP_CACHE
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/799a83bb/iocore/cache/CacheRead.cc
----------------------------------------------------------------------
diff --git a/iocore/cache/CacheRead.cc b/iocore/cache/CacheRead.cc
index 4ba02f4..6a41267 100644
--- a/iocore/cache/CacheRead.cc
+++ b/iocore/cache/CacheRead.cc
@@ -48,14 +48,14 @@ Cache::open_read(Continuation *cont, CacheKey *key, CacheFragType type, char *ho
CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
if (!lock.is_locked() || (od = vol->open_read(key)) || dir_probe(key, vol, &result, &last_collision)) {
c = new_CacheVC(cont);
- SET_CONTINUATION_HANDLER(c, &CacheVC::openReadStartHead);
+ c->vol = vol;
+ c->first_key = c->key = c->earliest_key = *key;
c->vio.op = VIO::READ;
c->base_stat = cache_read_active_stat;
- CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
- c->first_key = c->key = c->earliest_key = *key;
- c->vol = vol;
- c->frag_type = type;
c->od = od;
+ c->frag_type = type;
+ CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
+ SET_CONTINUATION_HANDLER(c, &CacheVC::openReadStartHead);
}
if (!c)
goto Lmiss;
@@ -92,6 +92,42 @@ Lcallreturn:
}
#ifdef HTTP_CACHE
+Action*
+Cache::open_read(Continuation* cont, CacheVConnection* vc, HTTPHdr* client_request_hdr)
+{
+ Action* zret = ACTION_RESULT_DONE;
+
+ CacheVC* write_vc = dynamic_cast<CacheVC*>(vc);
+ if (write_vc) {
+ Vol *vol = write_vc->vol;
+ ProxyMutex *mutex = cont->mutex; // needed for stat macros
+ CacheVC *c = new_CacheVC(cont);
+
+ c->vol = write_vc->vol;
+ c->first_key = write_vc->first_key;
+ c->earliest_key = c->key = write_vc->earliest_key;
+ c->vio.op = VIO::READ;
+ c->base_stat = cache_read_active_stat;
+ c->od = write_vc->od;
+ c->frag_type = write_vc->frag_type;
+ CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
+// write_vc->alternate.request_get(&c->request);
+// client_request_hdr->copy_shallow(&c->request);
+ c->request.copy_shallow(client_request_hdr);
+ c->params = write_vc->params; // seems to be a no-op, always NULL.
+ c->dir = c->first_dir = write_vc->first_dir;
+ c->write_vc = write_vc;
+ c->first_buf = write_vc->first_buf; // I don't think this is effective either.
+ SET_CONTINUATION_HANDLER(c, &CacheVC::openReadFromWriter);
+ zret = &c->_action; // default, override if needed.
+ CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
+ if (lock.is_locked() && c->handleEvent(EVENT_IMMEDIATE, 0) == EVENT_DONE) {
+ zret = ACTION_RESULT_DONE;
+ }
+ }
+ return zret;
+}
+
Action *
Cache::open_read(Continuation *cont, CacheKey *key, CacheHTTPHdr *request, CacheLookupHttpConfig *params, CacheFragType type,
char *hostname, int host_len)
@@ -112,15 +148,15 @@ Cache::open_read(Continuation *cont, CacheKey *key, CacheHTTPHdr *request, Cache
CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
if (!lock.is_locked() || (od = vol->open_read(key)) || dir_probe(key, vol, &result, &last_collision)) {
c = new_CacheVC(cont);
- c->first_key = c->key = c->earliest_key = *key;
c->vol = vol;
+ c->first_key = c->key = c->earliest_key = *key;
c->vio.op = VIO::READ;
c->base_stat = cache_read_active_stat;
+ c->od = od;
+ c->frag_type = CACHE_FRAG_TYPE_HTTP;
CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
c->request.copy_shallow(request);
- c->frag_type = CACHE_FRAG_TYPE_HTTP;
c->params = params;
- c->od = od;
}
if (!lock.is_locked()) {
SET_CONTINUATION_HANDLER(c, &CacheVC::openReadStartHead);
@@ -166,17 +202,43 @@ uint32_t
CacheVC::load_http_info(CacheHTTPInfoVector *info, Doc *doc, RefCountObj *block_ptr)
{
uint32_t zret = info->get_handles(doc->hdr(), doc->hlen, block_ptr);
- if (cache_config_compatibility_4_2_0_fixup && // manual override not engaged
- !this->f.doc_from_ram_cache && // it's already been done for ram cache fragments
- vol->header->version.ink_major == 23 && vol->header->version.ink_minor == 0) {
- for (int i = info->xcount - 1; i >= 0; --i) {
- info->data(i).alternate.m_alt->m_response_hdr.m_mime->recompute_accelerators_and_presence_bits();
- info->data(i).alternate.m_alt->m_request_hdr.m_mime->recompute_accelerators_and_presence_bits();
+ if (zret != static_cast<uint32_t>(-1) && // Make sure we haven't already failed
+ cache_config_compatibility_4_2_0_fixup && // manual override not engaged
+ ! this->f.doc_from_ram_cache && // it's already been done for ram cache fragments
+ vol->header->version.ink_major == 23 && vol->header->version.ink_minor == 0
+ ) {
+ for ( int i = info->xcount - 1 ; i >= 0 ; --i ) {
+ info->data(i)._alternate.m_alt->m_response_hdr.m_mime->recompute_accelerators_and_presence_bits();
+ info->data(i)._alternate.m_alt->m_request_hdr.m_mime->recompute_accelerators_and_presence_bits();
}
}
return zret;
}
+char const*
+CacheVC::get_http_range_boundary_string(int* len) const
+{
+ return resp_range.getBoundaryStr(len);
+}
+
+int64_t
+CacheVC::get_effective_content_size()
+{
+ return this->is_http_partial_content() ? resp_range.calcContentLength() : alternate.object_size_get();
+}
+
+HTTPRangeSpec&
+CacheVC::get_http_range_spec()
+{
+ return resp_range.getRangeSpec();
+}
+
+bool
+CacheVC::is_http_partial_content()
+{
+ return resp_range.hasRanges();
+}
+
int
CacheVC::openReadFromWriterFailure(int event, Event *e)
{
@@ -192,24 +254,13 @@ CacheVC::openReadFromWriterFailure(int event, Event *e)
int
CacheVC::openReadChooseWriter(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
{
- intptr_t err = ECACHE_DOC_BUSY;
- CacheVC *w = NULL;
+// intptr_t err = ECACHE_DOC_BUSY;
+// CacheVC *w = NULL;
ink_assert(vol->mutex->thread_holding == mutex->thread_holding && write_vc == NULL);
- if (!od)
- return EVENT_RETURN;
-
if (frag_type != CACHE_FRAG_TYPE_HTTP) {
- ink_assert(od->num_writers == 1);
- w = od->writers.head;
- if (w->start_time > start_time || w->closed < 0) {
- od = NULL;
- return EVENT_RETURN;
- }
- if (!w->closed)
- return -err;
- write_vc = w;
+ ink_release_assert(! "[amc] Fix reader from writer for non-HTTP");
}
#ifdef HTTP_CACHE
else {
@@ -217,41 +268,8 @@ CacheVC::openReadChooseWriter(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSE
int write_vec_cnt = write_vector->count();
for (int c = 0; c < write_vec_cnt; c++)
vector.insert(write_vector->get(c));
- // check if all the writers who came before this reader have
- // set the http_info.
- for (w = (CacheVC *)od->writers.head; w; w = (CacheVC *)w->opendir_link.next) {
- if (w->start_time > start_time || w->closed < 0)
- continue;
- if (!w->closed && !cache_config_read_while_writer) {
- return -err;
- }
- if (w->alternate_index != CACHE_ALT_INDEX_DEFAULT)
- continue;
-
- if (!w->closed && !w->alternate.valid()) {
- od = NULL;
- ink_assert(!write_vc);
- vector.clear(false);
- return EVENT_CONT;
- }
- // construct the vector from the writers.
- int alt_ndx = CACHE_ALT_INDEX_DEFAULT;
- if (w->f.update) {
- // all Update cases. Need to get the alternate index.
- alt_ndx = get_alternate_index(&vector, w->update_key);
- // if its an alternate delete
- if (!w->alternate.valid()) {
- if (alt_ndx >= 0)
- vector.remove(alt_ndx, false);
- continue;
- }
- }
- ink_assert(w->alternate.valid());
- if (w->alternate.valid())
- vector.insert(&w->alternate, alt_ndx);
- }
-
if (!vector.count()) {
+ // [amc] Not sure of the utility of this now
if (od->reading_vec) {
// the writer(s) are reading the vector, so there is probably
// an old vector. Since this reader came before any of the
@@ -267,22 +285,11 @@ CacheVC::openReadChooseWriter(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSE
return -ECACHE_ALT_MISS;
} else
alternate_index = 0;
- CacheHTTPInfo *obj = vector.get(alternate_index);
- for (w = (CacheVC *)od->writers.head; w; w = (CacheVC *)w->opendir_link.next) {
- if (obj->m_alt == w->alternate.m_alt) {
- write_vc = w;
- break;
- }
- }
vector.clear(false);
- if (!write_vc) {
- DDebug("cache_read_agg", "%p: key: %X writer alternate different: %d", this, first_key.slice32(1), alternate_index);
- od = NULL;
- return EVENT_RETURN;
- }
-
- DDebug("cache_read_agg", "%p: key: %X eKey: %d # alts: %d, ndx: %d, # writers: %d writer: %p", this, first_key.slice32(1),
- write_vc->earliest_key.slice32(1), vector.count(), alternate_index, od->num_writers, write_vc);
+ DDebug("cache_read_agg",
+ "%p: key: %X eKey: %d # alts: %d, ndx: %d, # writers: %d writer: %p",
+ this, first_key.slice32(1), write_vc->earliest_key.slice32(1),
+ vector.count(), alternate_index, od->num_writers, write_vc);
}
#endif // HTTP_CACHE
return EVENT_NONE;
@@ -324,14 +331,36 @@ CacheVC::openReadFromWriter(int event, Event *e)
return openReadStartHead(event, e);
} else
ink_assert(od == vol->open_read(&first_key));
- if (!write_vc) {
+
+ CACHE_TRY_LOCK(lock_od, od->mutex, mutex->thread_holding);
+ if (!lock_od.is_locked())
+ VC_SCHED_LOCK_RETRY();
+
+ if (od->open_writer) {
+ // Can't pick a writer yet, so wait for it.
+ if (!od->open_waiting.in(this)) {
+ wake_up_thread = mutex->thread_holding;
+ od->open_waiting.push(this);
+ }
+ Debug("amc", "[CacheVC::openReadFromWriter] waiting for %p", od->open_writer);
+ return EVENT_CONT; // wait for the writer to wake us up.
+ }
+
+ MUTEX_RELEASE(lock); // we have the OD lock now, don't need the vol lock.
+
+ if (write_vc && CACHE_ALT_INDEX_DEFAULT != (alternate_index = get_alternate_index(&(od->vector), earliest_key))) {
+ alternate.copy_shallow(od->vector.get(alternate_index));
+ MUTEX_RELEASE(lock_od);
+ SET_HANDLER(&CacheVC::openReadStartEarliest);
+ return openReadStartEarliest(event, e);
+ } else {
int ret = openReadChooseWriter(event, e);
if (ret < 0) {
- MUTEX_RELEASE(lock);
+ MUTEX_RELEASE(lock_od);
SET_HANDLER(&CacheVC::openReadFromWriterFailure);
return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, reinterpret_cast<Event *>(ret));
} else if (ret == EVENT_RETURN) {
- MUTEX_RELEASE(lock);
+ MUTEX_RELEASE(lock_od);
SET_HANDLER(&CacheVC::openReadStartHead);
return openReadStartHead(event, e);
} else if (ret == EVENT_CONT) {
@@ -339,15 +368,6 @@ CacheVC::openReadFromWriter(int event, Event *e)
VC_SCHED_WRITER_RETRY();
} else
ink_assert(write_vc);
- } else {
- if (writer_done()) {
- MUTEX_RELEASE(lock);
- DDebug("cache_read_agg", "%p: key: %X writer %p has left, continuing as normal read", this, first_key.slice32(1), write_vc);
- od = NULL;
- write_vc = NULL;
- SET_HANDLER(&CacheVC::openReadStartHead);
- return openReadStartHead(event, e);
- }
}
#ifdef HTTP_CACHE
OpenDirEntry *cod = od;
@@ -355,7 +375,7 @@ CacheVC::openReadFromWriter(int event, Event *e)
od = NULL;
// someone is currently writing the document
if (write_vc->closed < 0) {
- MUTEX_RELEASE(lock);
+ MUTEX_RELEASE(lock_od);
write_vc = NULL;
// writer aborted, continue as if there is no writer
SET_HANDLER(&CacheVC::openReadStartHead);
@@ -365,8 +385,8 @@ CacheVC::openReadFromWriter(int event, Event *e)
ink_assert(frag_type == CACHE_FRAG_TYPE_HTTP || write_vc->closed);
if (!write_vc->closed && !write_vc->fragment) {
if (!cache_config_read_while_writer || frag_type != CACHE_FRAG_TYPE_HTTP) {
- MUTEX_RELEASE(lock);
- return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, (Event *)-err);
+ MUTEX_RELEASE(lock_od);
+ return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, (Event *) - err);
}
DDebug("cache_read_agg", "%p: key: %X writer: closed:%d, fragment:%d, retry: %d", this, first_key.slice32(1), write_vc->closed,
write_vc->fragment, writer_lock_retry);
@@ -592,6 +612,8 @@ CacheVC::openReadReadDone(int event, Event *e)
goto Lcallreturn;
return EVENT_CONT;
} else if (write_vc) {
+ ink_release_assert(! "[amc] Handle this");
+# if 0
if (writer_done()) {
last_collision = NULL;
while (dir_probe(&earliest_key, vol, &dir, &last_collision)) {
@@ -610,6 +632,7 @@ CacheVC::openReadReadDone(int event, Event *e)
}
DDebug("cache_read_agg", "%p: key: %X ReadRead retrying: %d", this, first_key.slice32(1), (int)vio.ndone);
VC_SCHED_WRITER_RETRY(); // wait for writer
+# endif
}
// fall through for truncated documents
}
@@ -617,126 +640,164 @@ Lerror:
char tmpstring[100];
Warning("Document %s truncated", earliest_key.toHexStr(tmpstring));
return calluser(VC_EVENT_ERROR);
-Ldone:
+//Ldone:
return calluser(VC_EVENT_EOS);
Lcallreturn:
return handleEvent(AIO_EVENT_DONE, 0);
LreadMain:
- fragment++;
+ ++fragment;
doc_pos = doc->prefix_len();
+ doc_pos += resp_range.getOffset() - frag_upper_bound; // used before update!
+ frag_upper_bound += doc->data_len();
next_CacheKey(&key, &key);
SET_HANDLER(&CacheVC::openReadMain);
return openReadMain(event, e);
}
-int
-CacheVC::openReadMain(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
+void
+CacheVC::update_key_to_frag_idx(int target)
{
- cancel_trigger();
- Doc *doc = (Doc *)buf->data();
- int64_t ntodo = vio.ntodo();
- int64_t bytes = doc->len - doc_pos;
- IOBufferBlock *b = NULL;
- if (seek_to) { // handle do_io_pread
- if (seek_to >= doc_len) {
- vio.ndone = doc_len;
- return calluser(VC_EVENT_EOS);
- }
-#ifdef HTTP_CACHE
- HTTPInfo::FragOffset *frags = alternate.get_frag_table();
- if (is_debug_tag_set("cache_seek")) {
- char b[33], c[33];
- Debug("cache_seek", "Seek @ %" PRId64 " in %s from #%d @ %" PRId64 "/%d:%s", seek_to, first_key.toHexStr(b), fragment,
- doc_pos, doc->len, doc->key.toHexStr(c));
- }
- /* Because single fragment objects can migrate to hang off an alt vector
- they can appear to the VC as multi-fragment when they are not really.
- The essential difference is the existence of a fragment table.
- */
- if (frags) {
- int target = 0;
- HTTPInfo::FragOffset next_off = frags[target];
- int lfi = static_cast<int>(alternate.get_frag_offset_count()) - 1;
- ink_assert(lfi >= 0); // because it's not a single frag doc.
-
- /* Note: frag[i].offset is the offset of the first byte past the
- i'th fragment. So frag[0].offset is the offset of the first
- byte of fragment 1. In addition the # of fragments is one
- more than the fragment table length, the start of the last
- fragment being the last offset in the table.
- */
- if (fragment == 0 || seek_to < frags[fragment - 1] || (fragment <= lfi && frags[fragment] <= seek_to)) {
- // search from frag 0 on to find the proper frag
- while (seek_to >= next_off && target < lfi) {
- next_off = frags[++target];
- }
- if (target == lfi && seek_to >= next_off)
- ++target;
- } else { // shortcut if we are in the fragment already
- target = fragment;
- }
- if (target != fragment) {
- // Lread will read the next fragment always, so if that
- // is the one we want, we don't need to do anything
- int cfi = fragment;
- --target;
- while (target > fragment) {
- next_CacheKey(&key, &key);
- ++fragment;
- }
+ FragmentDescriptorTable& frags = (*alternate.get_frag_table());
+
+ if (0 == target) {
+ fragment = 0;
+ key = earliest_key;
+ } else if (! frags[target].m_key.is_zero()) {
+ key = frags[target].m_key;
+ } else { // step through the hard way
+ if (target < fragment) { // going backwards
+ if (target < (fragment - target)) { // faster to go from start
+ fragment = 0;
+ key = earliest_key;
+ } else { // quicker to go from here to there
while (target < fragment) {
prev_CacheKey(&key, &key);
--fragment;
+ if (frags[fragment].m_key.is_zero())
+ frags[fragment].m_key = key; // might as well store it as we go.
+ else ink_assert(frags[fragment].m_key == key);
}
-
- if (is_debug_tag_set("cache_seek")) {
- char target_key_str[33];
- key.toHexStr(target_key_str);
- Debug("cache_seek", "Seek #%d @ %" PRId64 " -> #%d @ %" PRId64 ":%s", cfi, doc_pos, target, seek_to, target_key_str);
- }
- goto Lread;
}
}
- doc_pos = doc->prefix_len() + seek_to;
- if (fragment)
- doc_pos -= static_cast<int64_t>(frags[fragment - 1]);
- vio.ndone = 0;
- seek_to = 0;
- ntodo = vio.ntodo();
- bytes = doc->len - doc_pos;
- if (is_debug_tag_set("cache_seek")) {
- char target_key_str[33];
- key.toHexStr(target_key_str);
- Debug("cache_seek", "Read # %d @ %" PRId64 "/%d for %" PRId64, fragment, doc_pos, doc->len, bytes);
+ // advance to target if we're not already there.
+ while (target > fragment) {
+ next_CacheKey(&key, &key);
+ ++fragment;
+ if (frags[fragment].m_key.is_zero())
+ frags[fragment].m_key = key; // might as well store it as we go.
+ else ink_assert(frags[fragment].m_key == key);
}
-#endif
}
- if (ntodo <= 0)
- return EVENT_CONT;
- if (vio.buffer.writer()->max_read_avail() > vio.buffer.writer()->water_mark && vio.ndone) // initiate read of first block
+}
+
+int
+CacheVC::frag_idx_for_offset(uint64_t offset)
+{
+ FragmentDescriptorTable* frags = alternate.get_frag_table();
+ int count = alternate.get_frag_count();
+ uint32_t ffs = alternate.get_frag_fixed_size();
+ int idx = count / 2;
+
+ ink_assert(offset < doc_len);
+
+ if (ffs) idx = offset / ffs; // good guess as to the right offset.
+
+ if (count > 1 && 0 == (*frags)[1].m_offset) ++idx;
+
+ do {
+ uint64_t upper = idx >= count ? doc_len : (*frags)[idx+1].m_offset;
+ uint64_t lower = idx <= 0 ? 0 : (*frags)[idx].m_offset;
+ if (offset < lower) idx = idx / 2;
+ else if (offset >= upper) idx = (count + idx + 1)/2;
+ else break;
+ } while (true);
+ return idx;
+}
+
+/* There is a fragment available, decide what do to next.
+ */
+int
+CacheVC::openReadMain(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
+{
+ cancel_trigger();
+ Doc *doc = (Doc *) buf->data();
+ int64_t bytes = vio.ntodo();
+ IOBufferBlock *b = NULL;
+ uint64_t target_offset = resp_range.getOffset();
+ uint64_t lower_bound = frag_upper_bound - doc->data_len();
+
+ if (bytes <= 0)
return EVENT_CONT;
- if ((bytes <= 0) && vio.ntodo() >= 0)
- goto Lread;
- if (bytes > vio.ntodo())
- bytes = vio.ntodo();
- b = new_IOBufferBlock(buf, bytes, doc_pos);
- b->_buf_end = b->_end;
- vio.buffer.writer()->append_block(b);
- vio.ndone += bytes;
- doc_pos += bytes;
- if (vio.ntodo() <= 0)
- return calluser(VC_EVENT_READ_COMPLETE);
- else {
- if (calluser(VC_EVENT_READ_READY) == EVENT_DONE)
+
+ // Start shipping
+ while (bytes > 0 && lower_bound <= target_offset && target_offset < frag_upper_bound) {
+ if (vio.buffer.writer()->max_read_avail() > vio.buffer.writer()->water_mark && vio.ndone) // wait for reader
+ return EVENT_CONT;
+
+ if (resp_range.hasPendingRangeShift()) { // in new range, shift to start location.
+ int b_len;
+ char const* b_str = resp_range.getBoundaryStr(&b_len);
+ size_t r_idx = resp_range.getIdx();
+
+ doc_pos = doc->prefix_len() + (target_offset - lower_bound);
+
+ vio.ndone += HTTPRangeSpec::writePartBoundary(vio.buffer.writer(), b_str, b_len
+ , doc_len, resp_range[r_idx]._min, resp_range[r_idx]._max
+ , resp_range.getContentTypeField(), r_idx >= (resp_range.count() - 1)
+ );
+ resp_range.consumeRangeShift();
+ Debug("amc", "Range boundary for range %" PRIu64, r_idx);
+ }
+
+ bytes = std::min(doc->len - doc_pos, static_cast<int64_t>(resp_range.getRemnantSize()));
+ bytes = std::min(bytes, vio.ntodo());
+ if (bytes > 0) {
+ b = new_IOBufferBlock(buf, bytes, doc_pos);
+ b->_buf_end = b->_end;
+ vio.buffer.writer()->append_block(b);
+ vio.ndone += bytes;
+ doc_pos += bytes;
+ resp_range.consume(bytes);
+ Debug("amc", "shipped %" PRId64 " bytes at target offset %" PRIu64, bytes, target_offset);
+ target_offset = resp_range.getOffset();
+ }
+
+ if (vio.ntodo() <= 0)
+ return calluser(VC_EVENT_READ_COMPLETE);
+ else if (calluser(VC_EVENT_READ_READY) == EVENT_DONE)
return EVENT_DONE;
- // we have to keep reading until we give the user all the
- // bytes it wanted or we hit the watermark.
- if (vio.ntodo() > 0 && !vio.buffer.writer()->high_water())
- goto Lread;
- return EVENT_CONT;
}
-Lread : {
- if (vio.ndone >= (int64_t)doc_len)
+
+
+#ifdef HTTP_CACHE
+ if (resp_range.getRemnantSize()) {
+ FragmentDescriptorTable* frags = alternate.get_frag_table();
+ int n_frags = alternate.get_frag_count();
+
+ // Quick check for offset in next fragment - very common
+ if (target_offset >= frag_upper_bound && (!frags || fragment > n_frags || target_offset < (*frags)[fragment].m_offset)) {
+ Debug("amc", "Non-seeking continuation to next fragment");
+ } else {
+ int target = -1; // target fragment index.
+
+ if (is_debug_tag_set("amc")) {
+ char b[33], c[33];
+ Debug("amc", "Seek @ %" PRIu64 " [r#=%d] in %s from #%d @ %" PRIu64 "/%d/%" PRId64 ":%s%s",
+ target_offset, resp_range.getIdx(), first_key.toHexStr(b), fragment, frag_upper_bound, doc->len, doc->total_len, doc->key.toHexStr(c)
+ , (frags ? "" : "no frag table")
+ );
+ }
+
+ target = this->frag_idx_for_offset(target_offset);
+ this->update_key_to_frag_idx(target);
+ /// one frag short, because it gets bumped when the fragment is actually read.
+ frag_upper_bound = target > 0 ? (*frags)[target].m_offset : 0;
+ Debug("amc", "Fragment seek from %d to %d target offset %" PRIu64, fragment - 1, target, target_offset);
+ }
+ }
+#endif
+
+ if (vio.ntodo() > 0 && 0 == resp_range.getRemnantSize())
// reached the end of the document and the user still wants more
return calluser(VC_EVENT_EOS);
last_collision = 0;
@@ -754,41 +815,84 @@ Lread : {
if (dir_probe(&key, vol, &dir, &last_collision)) {
SET_HANDLER(&CacheVC::openReadReadDone);
int ret = do_read_call(&key);
- if (ret == EVENT_RETURN)
- goto Lcallreturn;
+ if (ret == EVENT_RETURN) {
+ lock.release();
+ return handleEvent(AIO_EVENT_DONE, 0);
+ }
return EVENT_CONT;
} else if (write_vc) {
+ ink_release_assert(! "[amc] Handle case where fragment is not in the directory during read");
+ /* Check for an OpenDirEntry - if none, no hope. If so, see if any of the writers in it are still
+ planning to write this fragment (might check against all fragments - if any aren't going to
+ be written, might just give up at that point rather than hope some other user agent will happen
+ to ask for the missing fragments. Although, if there's a herd, that might not be so far fetched).
+ */
+# if 0
if (writer_done()) {
last_collision = NULL;
while (dir_probe(&earliest_key, vol, &dir, &last_collision)) {
if (dir_offset(&dir) == dir_offset(&earliest_dir)) {
- DDebug("cache_read_agg", "%p: key: %X ReadMain complete: %d", this, first_key.slice32(1), (int)vio.ndone);
+ DDebug("cache_read_agg", "%p: key: %X ReadMain complete: %d",
+ this, first_key.slice32(1), (int)vio.ndone);
doc_len = vio.ndone;
goto Leos;
}
}
- DDebug("cache_read_agg", "%p: key: %X ReadMain writer aborted: %d", this, first_key.slice32(1), (int)vio.ndone);
+ DDebug("cache_read_agg", "%p: key: %X ReadMain writer aborted: %d",
+ this, first_key.slice32(1), (int)vio.ndone);
goto Lerror;
}
DDebug("cache_read_agg", "%p: key: %X ReadMain retrying: %d", this, first_key.slice32(1), (int)vio.ndone);
SET_HANDLER(&CacheVC::openReadMain);
VC_SCHED_WRITER_RETRY();
+# endif
}
if (is_action_tag_set("cache"))
ink_release_assert(false);
- Warning("Document %X truncated at %d of %d, missing fragment %X", first_key.slice32(1), (int)vio.ndone, (int)doc_len,
- key.slice32(1));
+ Warning("Document %X truncated at %d of %d, missing fragment %X", first_key.slice32(1), (int)vio.ndone, (int)doc_len, key.slice32(1));
// remove the directory entry
dir_delete(&earliest_key, vol, &earliest_dir);
-}
-Lerror:
+ lock.release();
+//Lerror:
return calluser(VC_EVENT_ERROR);
-Leos:
+//Leos:
return calluser(VC_EVENT_EOS);
-Lcallreturn:
- return handleEvent(AIO_EVENT_DONE, 0);
}
+int
+CacheVC::openReadWaitEarliest(int evid, Event*)
+{
+ int zret = EVENT_CONT;
+ cancel_trigger();
+
+ CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
+ if (!lock.is_locked())
+ VC_SCHED_LOCK_RETRY();
+ Debug("amc", "[CacheVC::openReadWaitEarliest] [%d]", evid);
+ if (NULL == vol->open_read(&first_key)) {
+ // Writer is gone, so no more data for which to wait.
+ // Best option is to just start over from the first frag.
+ // Most likely scenario - object turned out to be a resident alternate so
+ // there's no explicit earliest frag.
+ lock.release();
+ SET_HANDLER(&self::openReadStartHead);
+ od = NULL;
+ key = first_key;
+ return handleEvent(EVENT_IMMEDIATE, 0);
+ } else if (dir_probe(&key, vol, &earliest_dir, &last_collision) ||
+ dir_lookaside_probe(&key, vol, &earliest_dir, NULL))
+ {
+ dir = earliest_dir;
+ SET_HANDLER(&self::openReadStartEarliest);
+ if ((zret = do_read_call(&key)) == EVENT_RETURN) {
+ lock.release();
+ return handleEvent(AIO_EVENT_DONE, 0);
+ }
+ }
+ return zret;
+}
+
+
/*
This code follows CacheVC::openReadStartHead closely,
if you change this you might have to change that.
@@ -843,6 +947,8 @@ CacheVC::openReadStartEarliest(int /* event ATS_UNUSED */, Event * /* e ATS_UNUS
earliest_key = key;
doc_pos = doc->prefix_len();
next_CacheKey(&key, &doc->key);
+ fragment = 1;
+ frag_upper_bound = doc->data_len();
vol->begin_read(this);
if (vol->within_hit_evacuate_window(&earliest_dir) &&
(!cache_config_hit_evacuate_size_limit || doc_len <= (uint64_t)cache_config_hit_evacuate_size_limit)
@@ -859,7 +965,7 @@ CacheVC::openReadStartEarliest(int /* event ATS_UNUSED */, Event * /* e ATS_UNUS
if (dir_probe(&key, vol, &earliest_dir, &last_collision) || dir_lookaside_probe(&key, vol, &earliest_dir, NULL)) {
dir = earliest_dir;
#if TS_USE_INTERIM_CACHE == 1
- if (dir_ininterim(&dir) && alternate.get_frag_offset_count() > 1) {
+ if (dir_ininterim(&dir) && alternate.get_frag_count() > 1) {
dir_delete(&key, vol, &dir);
last_collision = NULL;
goto Lread;
@@ -872,7 +978,19 @@ CacheVC::openReadStartEarliest(int /* event ATS_UNUSED */, Event * /* e ATS_UNUS
// read has detected that alternate does not exist in the cache.
// rewrite the vector.
#ifdef HTTP_CACHE
- if (!f.read_from_writer_called && frag_type == CACHE_FRAG_TYPE_HTTP) {
+ // It's OK if there's a writer for this alternate, we can wait on it.
+ if (od && od->has_writer(earliest_key)) {
+ wake_up_thread = mutex->thread_holding;
+ od->waiting_for(earliest_key, this, 0);
+ lock.release();
+ // The SM must be signaled that the cache read is open even if we haven't got the earliest frag
+ // yet because otherwise it won't set up the read side of the tunnel before the write side finishes
+ // and terminates the SM (in the case of a resident alternate). But the VC can't be left with this
+ // handler or it will confuse itself when it wakes up from the earliest frag read. So we put it
+ // in a special wait state / handler and then signal the SM.
+ SET_HANDLER(&self::openReadWaitEarliest);
+ return callcont(CACHE_EVENT_OPEN_READ); // must signal read is open
+ } else if (frag_type == CACHE_FRAG_TYPE_HTTP) {
// don't want any writers while we are evacuating the vector
if (!vol->open_write(this, false, 1)) {
Doc *doc1 = (Doc *)first_buf->data();
@@ -1000,6 +1118,10 @@ Lrestart:
/*
This code follows CacheVC::openReadStartEarliest closely,
if you change this you might have to change that.
+
+ This handles the I/O completion of reading the first doc of the object.
+ If there are alternates, we chain to openreadStartEarliest to read the
+ earliest doc.
*/
int
CacheVC::openReadStartHead(int event, Event *e)
@@ -1097,13 +1219,16 @@ CacheVC::openReadStartHead(int event, Event *e)
err = ECACHE_BAD_META_DATA;
goto Ldone;
}
- if (cache_config_select_alternate) {
+ // If @a params is @c NULL then we're a retry from a range request pair so don't do alt select.
+ // Instead try the @a earliest_key - if that's a match then that's the correct alt, written
+ // by the paired write VC.
+ if (cache_config_select_alternate && params) {
alternate_index = HttpTransactCache::SelectFromAlternates(&vector, &request, params);
if (alternate_index < 0) {
err = ECACHE_ALT_MISS;
goto Ldone;
}
- } else
+ } else if (CACHE_ALT_INDEX_DEFAULT == (alternate_index = get_alternate_index(&vector, earliest_key)))
alternate_index = 0;
alternate_tmp = vector.get(alternate_index);
if (!alternate_tmp->valid()) {
@@ -1117,12 +1242,30 @@ CacheVC::openReadStartHead(int event, Event *e)
alternate.copy_shallow(alternate_tmp);
alternate.object_key_get(&key);
doc_len = alternate.object_size_get();
- if (key == doc->key) { // is this my data?
+
+ // Handle any range related setup.
+ if (!resp_range.init(&request)) { // shouldn't this have been checked earlier?
+ err = ECACHE_INVALID_RANGE;
+ goto Ldone;
+ }
+ // If the object length is known we can check the range.
+ // Otherwise we have to leave it vague and talk to the origin to get full length info.
+ if (alternate.m_alt->m_flag.content_length_p && !resp_range.apply(doc_len)) {
+ err = ECACHE_UNSATISFIABLE_RANGE;
+ goto Ldone;
+ }
+ if (resp_range.isMulti())
+ resp_range.setContentTypeFromResponse(alternate.response_get()).generateBoundaryStr(earliest_key);
+// alternate.get_uncached(resp_range.getRangeSpec(), uncached_range.getRangeSpec());
+
+ if (key == doc->key) { // is this my data?
f.single_fragment = doc->single_fragment();
ink_assert(f.single_fragment); // otherwise need to read earliest
ink_assert(doc->hlen);
doc_pos = doc->prefix_len();
next_CacheKey(&key, &doc->key);
+ fragment = 1;
+ frag_upper_bound = doc->data_len();
} else {
f.single_fragment = false;
}
@@ -1130,6 +1273,8 @@ CacheVC::openReadStartHead(int event, Event *e)
#endif
{
next_CacheKey(&key, &doc->key);
+ fragment = 1;
+ frag_upper_bound = doc->data_len();
f.single_fragment = doc->single_fragment();
doc_pos = doc->prefix_len();
doc_len = doc->total_len;
@@ -1140,7 +1285,7 @@ CacheVC::openReadStartHead(int event, Event *e)
Debug("cache_read", "CacheReadStartHead - read %s target %s - %s %d of %" PRId64 " bytes, %d fragments",
doc->key.toHexStr(xt), key.toHexStr(yt), f.single_fragment ? "single" : "multi", doc->len, doc->total_len,
#ifdef HTTP_CACHE
- alternate.get_frag_offset_count()
+ alternate.get_frag_count()
#else
0
#endif
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/799a83bb/iocore/cache/CacheWrite.cc
----------------------------------------------------------------------
diff --git a/iocore/cache/CacheWrite.cc b/iocore/cache/CacheWrite.cc
index 4e6319e..18db8c1 100644
--- a/iocore/cache/CacheWrite.cc
+++ b/iocore/cache/CacheWrite.cc
@@ -33,13 +33,18 @@
// used to get the alternate which is actually present in the document
#ifdef HTTP_CACHE
int
-get_alternate_index(CacheHTTPInfoVector *cache_vector, CacheKey key)
+get_alternate_index(CacheHTTPInfoVector *cache_vector, CacheKey key, int idx)
{
int alt_count = cache_vector->count();
CacheHTTPInfo *obj;
if (!alt_count)
return -1;
+ // See if the hint is correct.
+ if (0 <= idx && idx < alt_count && cache_vector->get(idx)->compare_object_key(&key))
+ return idx;
+ // Otherwise scan the vector.
for (int i = 0; i < alt_count; i++) {
+ if (i == idx) continue; // already checked that one.
obj = cache_vector->get(i);
if (obj->compare_object_key(&key)) {
// Debug("cache_key", "Resident alternate key %X", key.slice32(0));
@@ -63,20 +68,21 @@ CacheVC::updateVector(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
VC_SCHED_LOCK_RETRY();
int ret = 0;
{
- CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
+ CACHE_TRY_LOCK(lock, od->mutex, mutex->thread_holding);
if (!lock.is_locked() || od->writing_vec)
VC_SCHED_LOCK_RETRY();
int vec = alternate.valid();
if (f.update) {
// all Update cases. Need to get the alternate index.
- alternate_index = get_alternate_index(write_vector, update_key);
- Debug("cache_update", "updating alternate index %d frags %d", alternate_index,
- alternate_index >= 0 ? write_vector->get(alternate_index)->get_frag_offset_count() : -1);
+ alternate_index = get_alternate_index(write_vector, update_key, alternate_index);
+ Debug("cache_update", "updating alternate index %d frags %d", alternate_index, alternate_index >=0 ? write_vector->get(alternate_index)->get_frag_count() : -1);
// if its an alternate delete
if (!vec) {
ink_assert(!total_len);
if (alternate_index >= 0) {
+ MUTEX_TRY_LOCK(stripe_lock, vol->mutex, mutex->thread_holding);
+ if (!stripe_lock.is_locked()) VC_SCHED_LOCK_RETRY();
write_vector->remove(alternate_index, true);
alternate_index = CACHE_ALT_REMOVED;
if (!write_vector->count())
@@ -98,12 +104,6 @@ CacheVC::updateVector(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
write_vector->remove(0, true);
}
if (vec) {
- /* preserve fragment offset data from old info. This method is
- called iff the update is a header only update so the fragment
- data should remain valid.
- */
- if (alternate_index >= 0)
- alternate.copy_frag_offsets_from(write_vector->get(alternate_index));
alternate_index = write_vector->insert(&alternate, alternate_index);
}
@@ -134,7 +134,7 @@ CacheVC::updateVector(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
od->writing_vec = 1;
f.use_first_key = 1;
SET_HANDLER(&CacheVC::openWriteCloseHeadDone);
- ret = do_write_call();
+ ret = do_write_lock_call();
}
if (ret == EVENT_RETURN)
return handleEvent(AIO_EVENT_DONE, 0);
@@ -161,7 +161,7 @@ CacheVC::updateVector(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
(f.use_fist_key || f.evac_vector) is set. Write_vector is written to disk
- alternate_index. Used only if write_vector needs to be written to disk.
Used to find out the VC's alternate in the write_vector and set its
- length to tatal_len.
+ length to total_len.
- write_len. The number of bytes for this fragment.
- total_len. The total number of bytes for the document so far.
Doc->total_len and alternate's total len is set to this value.
@@ -316,8 +316,8 @@ Vol::aggWriteDone(int event, Event *e)
header->last_write_pos = header->write_pos;
header->write_pos += io.aiocb.aio_nbytes;
ink_assert(header->write_pos >= start);
- DDebug("cache_agg", "Dir %s, Write: %" PRIu64 ", last Write: %" PRIu64 "\n", hash_text.get(), header->write_pos,
- header->last_write_pos);
+ Debug("cache_agg", "Dir %s, Write: %" PRIu64 ", last Write: %" PRIu64 "\n",
+ hash_text.get(), header->write_pos, header->last_write_pos);
ink_assert(header->write_pos == header->agg_pos);
if (header->write_pos + EVACUATION_SIZE > scan_pos)
periodic_scan();
@@ -726,7 +726,7 @@ agg_copy(char *p, CacheVC *vc)
IOBufferBlock *res_alt_blk = 0;
uint32_t len = vc->write_len + vc->header_len + vc->frag_len + sizeofDoc;
- ink_assert(vc->frag_type != CACHE_FRAG_TYPE_HTTP || len != sizeofDoc);
+ ink_assert(vc->frag_type != CACHE_FRAG_TYPE_HTTP || len != sizeofDoc || 0 == vc->fragment);
ink_assert(vol->round_to_approx_size(len) == vc->agg_len);
// update copy of directory entry for this document
dir_set_approx_size(&vc->dir, vc->agg_len);
@@ -788,16 +788,21 @@ agg_copy(char *p, CacheVC *vc)
#ifdef HTTP_CACHE
if (vc->frag_type == CACHE_FRAG_TYPE_HTTP) {
ink_assert(vc->write_vector->count() > 0);
- if (!vc->f.update && !vc->f.evac_vector) {
- ink_assert(!(vc->first_key == zero_key));
- CacheHTTPInfo *http_info = vc->write_vector->get(vc->alternate_index);
- http_info->object_size_set(vc->total_len);
- }
- // update + data_written => Update case (b)
- // need to change the old alternate's object length
- if (vc->f.update && vc->total_len) {
- CacheHTTPInfo *http_info = vc->write_vector->get(vc->alternate_index);
- http_info->object_size_set(vc->total_len);
+ if (vc->resp_range.hasRanges()) {
+ int64_t size = vc->alternate.object_size_get();
+ if (size >= 0) doc->total_len = size;
+ } else {
+ if (!vc->f.update && !vc->f.evac_vector) {
+ ink_assert(!(vc->first_key == zero_key));
+ CacheHTTPInfo *http_info = vc->write_vector->get(vc->alternate_index);
+ http_info->object_size_set(vc->total_len);
+ }
+ // update + data_written => Update case (b)
+ // need to change the old alternate's object length
+ if (vc->f.update && vc->total_len) {
+ CacheHTTPInfo *http_info = vc->write_vector->get(vc->alternate_index);
+ http_info->object_size_set(vc->total_len);
+ }
}
ink_assert(!(((uintptr_t)&doc->hdr()[0]) & HDR_PTR_ALIGNMENT_MASK));
ink_assert(vc->header_len == vc->write_vector->marshal(doc->hdr(), vc->header_len));
@@ -1068,7 +1073,36 @@ Lwait:
}
int
-CacheVC::openWriteCloseDir(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
+CacheVC::openWriteEmptyEarliestDone(int event, Event *e)
+{
+ cancel_trigger();
+ if (event == AIO_EVENT_DONE)
+ set_io_not_in_progress();
+ else if (is_io_in_progress())
+ return EVENT_CONT;
+
+ {
+ MUTEX_LOCK(lock, od->mutex, this_ethread());
+ alternate_index = get_alternate_index(write_vector, this->earliest_key);
+ od->write_complete(key, this, io.ok()); // in any case, the IO is over.
+ key = od->key_for(earliest_key, write_pos);
+ }
+
+ SET_HANDLER(&CacheVC::openWriteMain);
+
+ // on error terminate if we're already closed, otherwise notify external continuation.
+ if (!io.ok()) {
+ if (closed) {
+ closed = -1;
+ return die();
+ }
+ return calluser(VC_EVENT_ERROR);
+ }
+ return this->openWriteMain(event, e); // go back to writing our actual data.
+}
+
+int
+CacheVC::openWriteCloseDir(int /* event ATS_UNUSED */, Event */* e ATS_UNUSED */)
{
cancel_trigger();
{
@@ -1078,6 +1112,7 @@ CacheVC::openWriteCloseDir(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED *
ink_assert(!is_io_in_progress());
VC_SCHED_LOCK_RETRY();
}
+ od->close_writer(earliest_key, this);
vol->close_write(this);
if (closed < 0 && fragment)
dir_delete(&earliest_key, vol, &earliest_dir);
@@ -1179,13 +1214,16 @@ CacheVC::openWriteCloseHead(int event, Event *e)
cancel_trigger();
f.use_first_key = 1;
if (io.ok())
- ink_assert(fragment || (length == (int64_t)total_len));
+ ink_assert(fragment || (length == (int64_t)total_len) || (resp_range.hasRanges() && alternate.object_size_get() > alternate.get_frag_fixed_size()));
else
return openWriteCloseDir(event, e);
- if (f.data_done)
+ if (f.data_done) {
write_len = 0;
- else
+ } else {
write_len = length;
+ // If we're writing data in the first / header doc, then it's a resident alt.
+ alternate.m_alt->m_flag.complete_p = true;
+ }
#ifdef HTTP_CACHE
if (frag_type == CACHE_FRAG_TYPE_HTTP) {
SET_HANDLER(&CacheVC::updateVector);
@@ -1216,35 +1254,33 @@ CacheVC::openWriteCloseDataDone(int event, Event *e)
CACHE_TRY_LOCK(lock, vol->mutex, this_ethread());
if (!lock.is_locked())
VC_LOCK_RETRY_EVENT();
- if (!fragment) {
- ink_assert(key == earliest_key);
- earliest_dir = dir;
-#ifdef HTTP_CACHE
- } else {
- // Store the offset only if there is a table.
- // Currently there is no alt (and thence no table) for non-HTTP.
- if (alternate.valid())
- alternate.push_frag_offset(write_pos);
-#endif
- }
- fragment++;
- write_pos += write_len;
dir_insert(&key, vol, &dir);
- blocks = iobufferblock_skip(blocks, &offset, &length, write_len);
- next_CacheKey(&key, &key);
- if (length) {
- write_len = length;
- if (write_len > MAX_FRAG_SIZE)
- write_len = MAX_FRAG_SIZE;
- if ((ret = do_write_call()) == EVENT_RETURN)
- goto Lcallreturn;
- return ret;
- }
- f.data_done = 1;
- return openWriteCloseHead(event, e); // must be called under vol lock from here
}
-Lcallreturn:
- return handleEvent(AIO_EVENT_DONE, 0);
+
+ if (key == earliest_key)
+ earliest_dir = dir;
+
+ {
+ MUTEX_LOCK(lock, od->mutex, mutex->thread_holding);
+ write_vector->write_complete(earliest_key, this, true);
+ }
+
+ write_pos += write_len;
+ blocks = iobufferblock_skip(blocks, &offset, &length, write_len);
+ next_CacheKey(&key, &key);
+ if (length) {
+ write_len = length;
+ if (write_len > MAX_FRAG_SIZE)
+ write_len = MAX_FRAG_SIZE;
+ if ((ret = do_write_call()) == EVENT_RETURN)
+ return handleEvent(AIO_EVENT_DONE, 0);
+ return ret;
+ }
+
+ f.data_done = 1;
+ return openWriteCloseHead(event, e); // must be called under vol lock from here
+ // [amc] don't see why, guess we'll find out.
+
}
int
@@ -1277,8 +1313,9 @@ CacheVC::openWriteClose(int event, Event *e)
return openWriteCloseDir(event, e);
#endif
}
- if (length && (fragment || length > MAX_FRAG_SIZE)) {
+ if (length && (fragment || length > MAX_FRAG_SIZE || alternate.object_size_get() > alternate.get_frag_fixed_size())) {
SET_HANDLER(&CacheVC::openWriteCloseDataDone);
+ this->updateWriteStateFromRange();
write_len = length;
if (write_len > MAX_FRAG_SIZE)
write_len = MAX_FRAG_SIZE;
@@ -1306,48 +1343,92 @@ CacheVC::openWriteWriteDone(int event, Event *e)
SET_HANDLER(&CacheVC::openWriteMain);
return calluser(VC_EVENT_ERROR);
}
+
{
CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
if (!lock.is_locked())
VC_LOCK_RETRY_EVENT();
- // store the earliest directory. Need to remove the earliest dir
- // in case the writer aborts.
- if (!fragment) {
- ink_assert(key == earliest_key);
- earliest_dir = dir;
-#ifdef HTTP_CACHE
- } else {
- // Store the offset only if there is a table.
- // Currently there is no alt (and thence no table) for non-HTTP.
- if (alternate.valid())
- alternate.push_frag_offset(write_pos);
-#endif
- }
- ++fragment;
- write_pos += write_len;
dir_insert(&key, vol, &dir);
- DDebug("cache_insert", "WriteDone: %X, %X, %d", key.slice32(0), first_key.slice32(0), write_len);
- blocks = iobufferblock_skip(blocks, &offset, &length, write_len);
- next_CacheKey(&key, &key);
}
+
+ if (key == earliest_key)
+ earliest_dir = dir;
+
+ {
+ MUTEX_LOCK(lock, od->mutex, mutex->thread_holding);
+ write_vector->write_complete(earliest_key, this, true);
+ }
+
+ DDebug("cache_insert", "WriteDone: %X, %X, %d", key.slice32(0), first_key.slice32(0), write_len);
+
+ resp_range.consume(write_len);
+ blocks = iobufferblock_skip(blocks, &offset, &length, write_len);
+
if (closed)
return die();
SET_HANDLER(&CacheVC::openWriteMain);
return openWriteMain(event, e);
}
-static inline int
-target_fragment_size()
-{
+int64_t
+CacheProcessor::get_fixed_fragment_size() const {
return cache_config_target_fragment_size - sizeofDoc;
}
+
+Action*
+CacheVC::do_init_write()
+{
+ Debug("amc", "[do_init_write] vc=%p", this);
+ SET_CONTINUATION_HANDLER(this, &CacheVC::openWriteInit);
+ return EVENT_DONE == this->openWriteInit(EVENT_IMMEDIATE, 0) ? ACTION_RESULT_DONE : &_action;
+}
+
+/* Do some initial setup and then switch over to openWriteMain
+ */
+int
+CacheVC::openWriteInit(int eid, Event* event)
+{
+ Debug("amc", "[openWriteInit] vc=%p", this);
+ {
+ CACHE_TRY_LOCK(lock, od->mutex, mutex->thread_holding);
+ if (!lock.is_locked()) {
+ trigger = mutex->thread_holding->schedule_in_local(this, HRTIME_MSECONDS(cache_config_mutex_retry_delay), eid);
+ return EVENT_CONT;
+ }
+
+ // If we're not already in the alt vector, insert.
+ if (-1 == (alternate_index = get_alternate_index(write_vector, first_key))) {
+ alternate_index = write_vector->insert(&alternate);
+ }
+ // mark us as an writer.
+ write_vector->data[alternate_index]._writers.push(this);
+
+ if (this == od->open_writer) {
+ od->open_writer = NULL;
+ CacheVC* reader;
+ while (NULL != (reader = od->open_waiting.pop())) {
+ Debug("amc", "[CacheVC::openWriteInit] wake up %p", reader);
+ reader->wake_up_thread->schedule_imm(reader);
+ }
+ }
+ }
+
+ if (resp_range.hasRanges()) resp_range.start();
+// write_pos = resp_range.getOffset();
+// key = alternate.get_frag_key_of(write_pos);
+ SET_HANDLER(&CacheVC::openWriteMain);
+ return openWriteMain(eid, event);
+// return EVENT_DONE;
+}
+
int
CacheVC::openWriteMain(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
{
cancel_trigger();
int called_user = 0;
ink_assert(!is_io_in_progress());
+ Debug("amc", "[CacheVC::openWriteMain]");
Lagain:
if (!vio.buffer.writer()) {
if (calluser(VC_EVENT_WRITE_READY) == EVENT_DONE)
@@ -1363,10 +1444,15 @@ Lagain:
if (vio.ntodo() <= 0)
return EVENT_CONT;
}
+
int64_t ntodo = (int64_t)(vio.ntodo() + length);
int64_t total_avail = vio.buffer.reader()->read_avail();
int64_t avail = total_avail;
int64_t towrite = avail + length;
+ int64_t ffs = cacheProcessor.get_fixed_fragment_size();
+
+ Debug("amc", "[CacheVC::openWriteMain] ntodo=%" PRId64 " avail=%" PRId64 " towrite=%" PRId64, ntodo, avail, towrite);
+
if (towrite > ntodo) {
avail -= (towrite - ntodo);
towrite = ntodo;
@@ -1379,17 +1465,20 @@ Lagain:
blocks = vio.buffer.reader()->block;
offset = vio.buffer.reader()->start_offset;
}
+
if (avail > 0) {
vio.buffer.reader()->consume(avail);
vio.ndone += avail;
total_len += avail;
}
length = (uint64_t)towrite;
- if (length > target_fragment_size() && (length < target_fragment_size() + target_fragment_size() / 4))
- write_len = target_fragment_size();
+ // [amc] Need to change this to be exactly the fixed fragment size for this alternate.
+ if (length > ffs &&
+ (length < ffs + ffs / 4))
+ write_len = ffs;
else
write_len = length;
- bool not_writing = towrite != ntodo && towrite < target_fragment_size();
+ bool not_writing = towrite != ntodo && towrite < ffs;
if (!called_user) {
if (not_writing) {
called_user = 1;
@@ -1401,12 +1490,56 @@ Lagain:
}
if (not_writing)
return EVENT_CONT;
+
+ ink_assert(static_cast<int64_t>(write_pos) == alternate.get_frag_offset(fragment));
+ {
+ CacheHTTPInfo *alt = 0;
+ MUTEX_LOCK(lock, od->mutex, this_ethread());
+
+ this->updateWriteStateFromRange();
+ alternate_index = get_alternate_index(write_vector, earliest_key);
+ if (alternate_index < 0)
+ alternate_index = write_vector->insert(&alternate, alternate_index);
+
+ alt = write_vector->get(alternate_index);
+
+ if (fragment != 0 && !alt->m_alt->m_earliest.m_flag.cached_p) {
+ SET_HANDLER(&CacheVC::openWriteEmptyEarliestDone);
+ if (!od->is_write_active(earliest_key, 0)) {
+ write_len = 0;
+ key = earliest_key;
+ Debug("amc", "[CacheVC::openWriteMain] writing empty earliest");
+ } else {
+ // go on the wait list
+ od->waiting_for(earliest_key, this, 0);
+ not_writing = true;
+ }
+ } else if (od->is_write_active(earliest_key, write_pos)) {
+ od->waiting_for(earliest_key, this, write_pos);
+ not_writing = true;
+ } else if (alternate.is_frag_cached(fragment)) {
+ not_writing = true;
+ resp_range.consume(length); // just drop it.
+ Debug("amc", "Fragment %d already cached", fragment);
+ // Need to consume the actual data too (in blocks/offset).
+ } else {
+ od->write_active(earliest_key, this, write_pos);
+ }
+ }
+
+ if (0 == write_len) // need to set up the write not under OpenDir lock.
+ return do_write_lock_call();
+
if (towrite == ntodo && f.close_complete) {
closed = 1;
SET_HANDLER(&CacheVC::openWriteClose);
return openWriteClose(EVENT_NONE, NULL);
+ } else if (not_writing) {
+ return EVENT_CONT;
}
+
SET_HANDLER(&CacheVC::openWriteWriteDone);
+ Debug("amc", "[CacheVC::openWriteMain] doing write call");
return do_write_lock_call();
}
@@ -1444,7 +1577,7 @@ Lcollision : {
}
}
Ldone:
- SET_HANDLER(&CacheVC::openWriteMain);
+ SET_HANDLER(&CacheVC::openWriteInit);
return callcont(CACHE_EVENT_OPEN_WRITE);
Lcallreturn:
return handleEvent(AIO_EVENT_DONE, 0); // hopefully a tail call
@@ -1539,8 +1672,8 @@ CacheVC::openWriteStartDone(int event, Event *e)
goto Lfailure;
if (od->has_multiple_writers()) {
MUTEX_RELEASE(lock);
- SET_HANDLER(&CacheVC::openWriteMain);
- return callcont(CACHE_EVENT_OPEN_WRITE);
+ SET_HANDLER(&CacheVC::openWriteInit);
+ return this->openWriteInit(EVENT_IMMEDIATE, 0);
}
}
// check for collision
@@ -1560,8 +1693,8 @@ Lsuccess:
od->reading_vec = 0;
if (_action.cancelled)
goto Lcancel;
- SET_HANDLER(&CacheVC::openWriteMain);
- return callcont(CACHE_EVENT_OPEN_WRITE);
+ SET_HANDLER(&CacheVC::openWriteInit);
+ return this->openWriteInit(EVENT_IMMEDIATE, 0);
Lfailure:
CACHE_INCREMENT_DYN_STAT(base_stat + CACHE_STAT_FAILURE);
@@ -1598,7 +1731,7 @@ CacheVC::openWriteStartBegin(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED
return openWriteOverwrite(EVENT_IMMEDIATE, 0);
} else {
// write by key
- SET_HANDLER(&CacheVC::openWriteMain);
+ SET_HANDLER(&CacheVC::openWriteInit);
return callcont(CACHE_EVENT_OPEN_WRITE);
}
}
@@ -1658,7 +1791,7 @@ Cache::open_write(Continuation *cont, CacheKey *key, CacheFragType frag_type, in
return &c->_action;
}
if (!c->f.overwrite) {
- SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteMain);
+ SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteInit);
c->callcont(CACHE_EVENT_OPEN_WRITE);
return ACTION_RESULT_DONE;
} else {
@@ -1670,6 +1803,21 @@ Cache::open_write(Continuation *cont, CacheKey *key, CacheFragType frag_type, in
}
}
+int
+CacheVC::updateWriteStateFromRange()
+{
+ if (resp_range.hasPendingRangeShift())
+ resp_range.consumeRangeShift();
+ write_pos = resp_range.getOffset();
+ fragment = alternate.get_frag_index_of(write_pos);
+ key = alternate.get_frag_key(fragment);
+ {
+ char tmp[64];
+ Debug("amc", "[writeMain] pos=%" PRId64 " frag=%d/%" PRId64 " key=%s", write_pos, fragment, alternate.get_frag_offset(fragment), key.toHexStr(tmp));
+ }
+ return write_pos;
+}
+
#ifdef HTTP_CACHE
// main entry point for writing of http documents
Action *
@@ -1762,6 +1910,8 @@ Cache::open_write(Continuation *cont, CacheKey *key, CacheHTTPInfo *info, time_t
goto Lfailure;
}
// document doesn't exist, begin write
+ ink_assert(NULL == c->od->open_writer);
+ c->od->open_writer = c;
goto Lmiss;
} else {
c->od->reading_vec = 1;
@@ -1784,7 +1934,8 @@ Cache::open_write(Continuation *cont, CacheKey *key, CacheHTTPInfo *info, time_t
}
Lmiss:
- SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteMain);
+// return c->do_init_write();
+ SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteInit);
c->callcont(CACHE_EVENT_OPEN_WRITE);
return ACTION_RESULT_DONE;
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/799a83bb/iocore/cache/I_Cache.h
----------------------------------------------------------------------
diff --git a/iocore/cache/I_Cache.h b/iocore/cache/I_Cache.h
index af99d30..7ee6b37 100644
--- a/iocore/cache/I_Cache.h
+++ b/iocore/cache/I_Cache.h
@@ -49,6 +49,7 @@
#define CACHE_COMPRESSION_LIBZ 2
#define CACHE_COMPRESSION_LIBLZMA 3
+struct CacheVConnection;
struct CacheVC;
struct CacheDisk;
#ifdef HTTP_CACHE
@@ -56,6 +57,7 @@ class CacheLookupHttpConfig;
class URL;
class HTTPHdr;
class HTTPInfo;
+class HTTPRangeSpec;
typedef HTTPHdr CacheHTTPHdr;
typedef URL CacheURL;
@@ -80,11 +82,35 @@ struct CacheProcessor : public Processor {
CacheFragType frag_type = CACHE_FRAG_TYPE_NONE, char *hostname = 0, int host_len = 0);
inkcoreapi Action *open_read(Continuation *cont, CacheKey *key, bool cluster_cache_local,
CacheFragType frag_type = CACHE_FRAG_TYPE_NONE, char *hostname = 0, int host_len = 0);
- inkcoreapi Action *open_write(Continuation *cont, CacheKey *key, bool cluster_cache_local,
- CacheFragType frag_type = CACHE_FRAG_TYPE_NONE, int expected_size = CACHE_EXPECTED_SIZE,
- int options = 0, time_t pin_in_cache = (time_t)0, char *hostname = 0, int host_len = 0);
- inkcoreapi Action *remove(Continuation *cont, CacheKey *key, bool cluster_cache_local,
- CacheFragType frag_type = CACHE_FRAG_TYPE_NONE, bool rm_user_agents = true, bool rm_link = false,
+
+ /** Open a cache reader from an already open writer.
+
+ This is used for partial content on a cache miss to open a reader corresponding to the
+ partial content writer.
+ */
+ inkcoreapi Action* open_read(Continuation* cont, CacheVConnection* writer, HTTPHdr* client_request_hdr);
+
+ Action *open_read_buffer(Continuation *cont, MIOBuffer *buf, CacheKey *key,
+ CacheFragType frag_type = CACHE_FRAG_TYPE_NONE, char *hostname = 0, int host_len = 0);
+
+ inkcoreapi Action *open_write(Continuation *cont,
+ CacheKey *key,
+ bool cluster_cache_local,
+ CacheFragType frag_type = CACHE_FRAG_TYPE_NONE,
+ int expected_size = CACHE_EXPECTED_SIZE,
+ int options = 0,
+ time_t pin_in_cache = (time_t) 0,
+ char *hostname = 0, int host_len = 0);
+ Action *open_write_buffer(Continuation *cont, MIOBuffer *buf,
+ CacheKey *key,
+ CacheFragType frag_type = CACHE_FRAG_TYPE_NONE,
+ int options = 0,
+ time_t pin_in_cache = (time_t) 0,
+ char *hostname = 0, int host_len = 0);
+ inkcoreapi Action *remove(Continuation *cont, CacheKey *key,
+ bool cluster_cache_local,
+ CacheFragType frag_type = CACHE_FRAG_TYPE_NONE,
+ bool rm_user_agents = true, bool rm_link = false,
char *hostname = 0, int host_len = 0);
Action *scan(Continuation *cont, char *hostname = 0, int host_len = 0, int KB_per_second = SCAN_KB_PER_SECOND);
#ifdef HTTP_CACHE
@@ -126,6 +152,9 @@ struct CacheProcessor : public Processor {
*/
bool has_online_storage() const;
+ /** Get the target fragment size. */
+ int64_t get_fixed_fragment_size() const;
+
static int IsCacheEnabled();
static bool IsCacheReady(CacheFragType type);
@@ -190,6 +219,57 @@ struct CacheVConnection : public VConnection {
#ifdef HTTP_CACHE
virtual void set_http_info(CacheHTTPInfo *info) = 0;
virtual void get_http_info(CacheHTTPInfo **info) = 0;
+
+ /** Get the boundary string for a multi-part range response.
+ The length of the string is returned in @a len.
+
+ @return A point to the string.
+ */
+ virtual char const* get_http_range_boundary_string(int* len) const = 0;
+
+ /** Get the effective content size.
+
+ This is the amount of actual data based on any range or framing. Effectively this is the
+ value to be passed to the @c VIO while the content length is used in the HTTP header.
+ */
+ virtual int64_t get_effective_content_size() = 0;
+
+ /** Set the origin reported content size.
+
+ This is the content length reported by the origin server and should be considered a hint, not
+ definitive. The object size, as stored in the cache, is the actual amount of data received and
+ cached.
+
+ @note This is the total content length as reported in the HTTP header, not the partial (range based) response size.
+ Also this is the length of the HTTP content, which may differ from the size of the data stream.
+ */
+ virtual void set_full_content_length(int64_t) = 0;
+
+ /** Get the range spec for the response (request ranges modifed by content length).
+ @internal Need better comment - this is the range spec used for a response from ATS to the user agent
+ from cached data. Even better we have potentially 2 response ranges - that from the origin server to
+ ATS and that from ATS to the user agent which are only somewhat similar, depending on what exactly
+ is in the cache at the moment.
+ */
+ virtual HTTPRangeSpec& get_http_range_spec() = 0;
+
+ /// Check if this is HTTP partial content (range request/response).
+ virtual bool is_http_partial_content() = 0;
+
+ /// Get the unchanged ranges.
+ /// @return @c true if the @a result is not empty.
+ /// @internal Currently this just returns the single range that is convex hull of the uncached request.
+ /// Someday we may want to do the exact range spec but we use the type for now because it's easier.
+ virtual bool get_uncached(HTTPRangeSpec& result) = 0;
+
+ /** Set the range for the input (response content).
+ The incoming bytes will be written to this section of the object.
+ @note This range @b must be absolute.
+ @note The range is inclusive.
+ @return The # of bytes in the range.
+ */
+ virtual int64_t set_inbound_range(int64_t min, int64_t max) { return 1 + (max - min); }
+
#endif
virtual bool is_ram_cache_hit() const = 0;
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/799a83bb/iocore/cache/I_CacheDefs.h
----------------------------------------------------------------------
diff --git a/iocore/cache/I_CacheDefs.h b/iocore/cache/I_CacheDefs.h
index 93b0cac..df62eca 100644
--- a/iocore/cache/I_CacheDefs.h
+++ b/iocore/cache/I_CacheDefs.h
@@ -21,6 +21,7 @@
limitations under the License.
*/
+#include <vector>
#ifndef _I_CACHE_DEFS_H__
#define _I_CACHE_DEFS_H__
@@ -32,7 +33,7 @@
#define CACHE_ALT_INDEX_DEFAULT -1
#define CACHE_ALT_REMOVED -2
-#define CACHE_DB_MAJOR_VERSION 24
+#define CACHE_DB_MAJOR_VERSION 25
#define CACHE_DB_MINOR_VERSION 0
#define CACHE_DIR_MAJOR_VERSION 18
@@ -126,4 +127,5 @@ typedef CryptoHash CacheKey;
word(2) - tag (lower bits), hosttable hash (upper bits)
word(3) - ram cache hash, lookaside cache
*/
+
#endif // __CACHE_DEFS_H__