You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by am...@apache.org on 2015/04/19 19:45:18 UTC
[4/8] trafficserver git commit: First correct range processing
(single range).
First correct range processing (single range).
Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo
Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/d33b6255
Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/d33b6255
Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/d33b6255
Branch: refs/heads/ts-974-multi-range-read
Commit: d33b6255f031801ed176b3fb77caea09b6640b37
Parents: ee70849
Author: Alan M. Carroll <so...@yahoo-inc.com>
Authored: Mon Dec 1 11:04:56 2014 -0600
Committer: Alan M. Carroll <so...@yahoo-inc.com>
Committed: Sat Dec 6 11:56:03 2014 -0600
----------------------------------------------------------------------
iocore/cache/Cache.cc | 5 ++
iocore/cache/CacheHttp.cc | 40 ++++++++++
iocore/cache/CacheRead.cc | 152 ++++++++++++++++++++----------------
iocore/cache/I_CacheDefs.h | 29 ++++++-
iocore/cache/P_CacheInternal.h | 68 ++++++++++++++--
5 files changed, 220 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/d33b6255/iocore/cache/Cache.cc
----------------------------------------------------------------------
diff --git a/iocore/cache/Cache.cc b/iocore/cache/Cache.cc
index 9aece93..de20dc9 100644
--- a/iocore/cache/Cache.cc
+++ b/iocore/cache/Cache.cc
@@ -2751,6 +2751,11 @@ LinterimRead:
io.action = this;
io.thread = mutex->thread_holding->tt == DEDICATED ? AIO_CALLBACK_THREAD_ANY : mutex->thread_holding;
SET_HANDLER(&CacheVC::handleReadDone);
+ {
+ char xt[33];
+ Debug("amc", "cache read : key = %s %" PRId64 " bytes at stripe offset =% " PRId64
+ , key.toHexStr(xt), io.aiocb.aio_nbytes, io.aiocb.aio_offset);
+ }
ink_assert(ink_aio_read(&io) >= 0);
CACHE_DEBUG_INCREMENT_DYN_STAT(cache_pread_count_stat);
return EVENT_CONT;
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/d33b6255/iocore/cache/CacheHttp.cc
----------------------------------------------------------------------
diff --git a/iocore/cache/CacheHttp.cc b/iocore/cache/CacheHttp.cc
index 024f5ca..3b86a9b 100644
--- a/iocore/cache/CacheHttp.cc
+++ b/iocore/cache/CacheHttp.cc
@@ -374,6 +374,46 @@ RangeSpec::finalize(uint64_t len)
/*-------------------------------------------------------------------------
-------------------------------------------------------------------------*/
+bool
+CacheRange::finalize(uint64_t len)
+{
+ bool zret = super::finalize(len);
+ if (zret) {
+ if (this->isEmpty()) { // pretend it's one range [0..len)
+ _offset = 0;
+ } else {
+ _idx = 0;
+ _offset = _single._min;
+ }
+ _len = len;
+ }
+ return zret;
+}
+
+uint64_t
+CacheRange::consume(uint64_t size)
+{
+ switch (_state) {
+ case EMPTY: _offset += size; break;
+ case SINGLE: _offset += std::min(size, (_single._max - _offset) + 1 ); break;
+ case MULTI:
+ while (size && _idx < static_cast<int>(_ranges.size())) {
+ uint64_t r = std::min(size, (_ranges[_idx]._max - _offset) + 1);
+ _offset += r;
+ size -= r;
+ if (_offset > _ranges[_idx]._max)
+ _offset = _ranges[++_idx]._min;
+ }
+ break;
+ default: break;
+ }
+
+ return _offset;
+}
+
+/*-------------------------------------------------------------------------
+ -------------------------------------------------------------------------*/
+
#else //HTTP_CACHE
CacheHTTPInfoVector::CacheHTTPInfoVector()
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/d33b6255/iocore/cache/CacheRead.cc
----------------------------------------------------------------------
diff --git a/iocore/cache/CacheRead.cc b/iocore/cache/CacheRead.cc
index c2caab7..f263d82 100644
--- a/iocore/cache/CacheRead.cc
+++ b/iocore/cache/CacheRead.cc
@@ -634,13 +634,55 @@ Ldone:
Lcallreturn:
return handleEvent(AIO_EVENT_DONE, 0);
LreadMain:
- fragment++;
+ ++fragment;
doc_pos = doc->prefix_len();
+ if (req_rs.isValid()) {
+ doc_pos += req_rs.getOffset() - frag_upper_bound; // used before update!
+ }
+ frag_upper_bound += doc->data_len();
next_CacheKey(&key, &key);
SET_HANDLER(&CacheVC::openReadMain);
return openReadMain(event, e);
}
+void
+CacheVC::update_key_to_frag_idx(int target)
+{
+ if (target < fragment) { // going backwards
+ if (target < (fragment - target)) { // faster to go from start
+ fragment = 0;
+ key = earliest_key;
+ } else { // quicker to go from here to there
+ while (target < fragment) {
+ prev_CacheKey(&key, &key);
+ --fragment;
+ }
+ }
+ }
+ // advance to target if we're not already there.
+ while (target > fragment) {
+ next_CacheKey(&key, &key);
+ ++fragment;
+ }
+}
+
+int
+CacheVC::frag_idx_for_offset(HTTPInfo::FragOffset* frags, int count, uint64_t offset)
+{
+ int idx = count / 2;
+ ink_assert(offset < doc_len);
+ do {
+ uint64_t upper = idx >= count ? doc_len : frags[idx];
+ uint64_t lower = idx <= 0 ? 0 : frags[idx-1];
+ if (offset < lower) idx = idx / 2;
+ else if (offset >= upper) idx = (count + idx + 1)/2;
+ else break;
+ } while (true);
+ return idx;
+}
+
+/* There is a fragment available, decide what do to next.
+ */
int
CacheVC::openReadMain(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
{
@@ -649,78 +691,41 @@ CacheVC::openReadMain(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
int64_t ntodo = vio.ntodo();
int64_t bytes = doc->len - doc_pos;
IOBufferBlock *b = NULL;
- if (seek_to) { // handle do_io_pread
- if (seek_to >= doc_len) {
- vio.ndone = doc_len;
- return calluser(VC_EVENT_EOS);
- }
#ifdef HTTP_CACHE
- HTTPInfo::FragOffset* frags = alternate.get_frag_table();
- if (is_debug_tag_set("cache_seek")) {
- char b[33], c[33];
- Debug("cache_seek", "Seek @ %" PRId64" in %s from #%d @ %" PRId64"/%d:%s",
- seek_to, first_key.toHexStr(b), fragment, doc_pos, doc->len, doc->key.toHexStr(c));
- }
- /* Because single fragment objects can migrate to hang off an alt vector
- they can appear to the VC as multi-fragment when they are not really.
- The essential difference is the existence of a fragment table.
- */
- if (frags) {
- int target = 0;
- HTTPInfo::FragOffset next_off = frags[target];
- int lfi = static_cast<int>(alternate.get_frag_offset_count()) - 1;
- ink_assert(lfi >= 0); // because it's not a single frag doc.
+ if (req_rs.isValid()) {
+ int target = -1; // target fragment index.
+ uint64_t target_offset = req_rs.getOffset();
+ uint64_t lower_bound = frag_upper_bound - doc->data_len();
- /* Note: frag[i].offset is the offset of the first byte past the
- i'th fragment. So frag[0].offset is the offset of the first
- byte of fragment 1. In addition the # of fragments is one
- more than the fragment table length, the start of the last
- fragment being the last offset in the table.
- */
- if (fragment == 0 ||
- seek_to < frags[fragment-1] ||
- (fragment <= lfi && frags[fragment] <= seek_to)
- ) {
- // search from frag 0 on to find the proper frag
- while (seek_to >= next_off && target < lfi) {
- next_off = frags[++target];
- }
- if (target == lfi && seek_to >= next_off) ++target;
- } else { // shortcut if we are in the fragment already
- target = fragment;
+ if (target_offset < lower_bound || frag_upper_bound <= target_offset) {
+ HTTPInfo::FragOffset* frags = alternate.get_frag_table();
+
+ if (is_debug_tag_set("amc")) {
+ char b[33], c[33];
+ Debug("amc", "Seek @ %" PRIu64 " [r#=%d] in %s from #%d @ %" PRIu64 "/%d/%" PRId64 ":%s%s",
+ target_offset, req_rs.getIdx(), first_key.toHexStr(b), fragment, frag_upper_bound, doc->len, doc->total_len, doc->key.toHexStr(c)
+ , (frags ? "" : "no frag table")
+ );
}
- if (target != fragment) {
- // Lread will read the next fragment always, so if that
- // is the one we want, we don't need to do anything
- int cfi = fragment;
- --target;
- while (target > fragment) {
- next_CacheKey(&key, &key);
- ++fragment;
- }
- while (target < fragment) {
- prev_CacheKey(&key, &key);
- --fragment;
- }
- if (is_debug_tag_set("cache_seek")) {
- char target_key_str[33];
- key.toHexStr(target_key_str);
- Debug("cache_seek", "Seek #%d @ %" PRId64" -> #%d @ %" PRId64":%s", cfi, doc_pos, target, seek_to, target_key_str);
- }
- goto Lread;
+ if (frags) {
+ int n_frags = alternate.get_frag_offset_count();
+ target = this->frag_idx_for_offset(frags, n_frags, target_offset);
+ this->update_key_to_frag_idx(target);
+ /// one frag short, because it gets bumped when the fragment is actually read.
+ frag_upper_bound = target > 0 ? frags[target-1] : 0;
+ } else { // we don't support non-monotonic forward requests on objects with no frag table.
+ ink_release_assert(target_offset >= frag_upper_bound);
}
- }
- doc_pos = doc->prefix_len() + seek_to;
- if (fragment) doc_pos -= static_cast<int64_t>(frags[fragment-1]);
- vio.ndone = 0;
- seek_to = 0;
- ntodo = vio.ntodo();
- bytes = doc->len - doc_pos;
- if (is_debug_tag_set("cache_seek")) {
- char target_key_str[33];
- key.toHexStr(target_key_str);
- Debug("cache_seek", "Read # %d @ %" PRId64"/%d for %" PRId64, fragment, doc_pos, doc->len, bytes);
+ goto Lread;
+ } else {
+ // Adjust fragment starting offset to account for target offset.
+ int r_doc_pos = doc->prefix_len() + (target_offset - lower_bound);
+ if (doc_pos < r_doc_pos) {
+ doc_pos = r_doc_pos;
+ bytes = doc->len - doc_pos;
+ }
+ bytes = std::min(bytes, static_cast<int64_t>(req_rs.getRemnantSize()));
}
#endif
}
@@ -730,6 +735,7 @@ CacheVC::openReadMain(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
return EVENT_CONT;
if ((bytes <= 0) && vio.ntodo() >= 0)
goto Lread;
+ // Do the write to downstream consumers.
if (bytes > vio.ntodo())
bytes = vio.ntodo();
b = new_IOBufferBlock(buf, bytes, doc_pos);
@@ -737,6 +743,7 @@ CacheVC::openReadMain(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
vio.buffer.writer()->append_block(b);
vio.ndone += bytes;
doc_pos += bytes;
+ req_rs.consume(bytes);
if (vio.ntodo() <= 0)
return calluser(VC_EVENT_READ_COMPLETE);
else {
@@ -857,6 +864,7 @@ CacheVC::openReadStartEarliest(int /* event ATS_UNUSED */, Event * /* e ATS_UNUS
earliest_key = key;
doc_pos = doc->prefix_len();
next_CacheKey(&key, &doc->key);
+ fragment = 1;
vol->begin_read(this);
if (vol->within_hit_evacuate_window(&earliest_dir) &&
(!cache_config_hit_evacuate_size_limit || doc_len <= (uint64_t)cache_config_hit_evacuate_size_limit)
@@ -1016,6 +1024,10 @@ Lrestart:
/*
This code follows CacheVC::openReadStartEarliest closely,
if you change this you might have to change that.
+
+ This handles the I/O completion of reading the first doc of the object.
+ If there are alternates, we chain to openreadStartEarliest to read the
+ earliest doc.
*/
int
CacheVC::openReadStartHead(int event, Event * e)
@@ -1144,6 +1156,8 @@ CacheVC::openReadStartHead(int event, Event * e)
ink_assert(doc->hlen);
doc_pos = doc->prefix_len();
next_CacheKey(&key, &doc->key);
+ fragment = 1;
+ frag_upper_bound = doc->data_len();
} else {
f.single_fragment = false;
}
@@ -1151,6 +1165,8 @@ CacheVC::openReadStartHead(int event, Event * e)
#endif
{
next_CacheKey(&key, &doc->key);
+ fragment = 1;
+ frag_upper_bound = doc->data_len();
f.single_fragment = doc->single_fragment();
doc_pos = doc->prefix_len();
doc_len = doc->total_len;
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/d33b6255/iocore/cache/I_CacheDefs.h
----------------------------------------------------------------------
diff --git a/iocore/cache/I_CacheDefs.h b/iocore/cache/I_CacheDefs.h
index df430f4..ce50958 100644
--- a/iocore/cache/I_CacheDefs.h
+++ b/iocore/cache/I_CacheDefs.h
@@ -203,9 +203,18 @@ struct RangeSpec {
*/
size_t count() const;
- /// If this is a valid, single range specification.
+ /// If this is a valid single range specification.
bool isSingle() const;
+ /// If this is a valid multi range specification.
+ bool isMulti() const;
+
+ /// Test if this contains at least one valid range.
+ bool isValid() const;
+
+ /// Test if this is a valid but empty range spec.
+ bool isEmpty() const;
+
protected:
self& add(uint64_t low, uint64_t high);
};
@@ -221,12 +230,30 @@ RangeSpec::isSingle() const
return SINGLE == _state;
}
+inline bool
+RangeSpec::isMulti() const
+{
+ return MULTI == _state;
+}
+
+inline bool
+RangeSpec::isEmpty() const
+{
+ return EMPTY == _state;
+}
+
inline size_t
RangeSpec::count() const
{
return SINGLE == _state ? 1 : _ranges.size();
}
+inline bool
+RangeSpec::isValid() const
+{
+ return SINGLE == _state || MULTI == _state;
+}
+
inline RangeSpec::Range&
RangeSpec::Range::invalidate()
{
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/d33b6255/iocore/cache/P_CacheInternal.h
----------------------------------------------------------------------
diff --git a/iocore/cache/P_CacheInternal.h b/iocore/cache/P_CacheInternal.h
index 5a0b8fc..2de18a3 100644
--- a/iocore/cache/P_CacheInternal.h
+++ b/iocore/cache/P_CacheInternal.h
@@ -235,20 +235,43 @@ extern int good_interim_disks;
/** Range operation tracking.
- This holds a range specification and tracks where in the range the current
- cache operation is.
+ This holds a range specification. It also tracks the current object offset and the individual range.
+
+ For simplification of the logic that uses this class it will pretend to be a single range of
+ the object size if it is empty. To return the correct response we still need to distinuish
+ those two cases.
*/
class CacheRange : public RangeSpec
{
public:
typedef CacheRange self; ///< Self reference type.
+ typedef RangeSpec super; ///< Parent type.
/// Default constructor
- CacheRange() : _offset(0), _idx(-1) { }
+ CacheRange() : super(), _offset(0), _idx(-1) { }
+
+ /// Convert to specific values based on content @a length.
+ bool finalize(uint64_t length);
+
+ /// Get the current object offset
+ uint64_t getOffset() const;
+
+ /// Get the current range index.
+ int getIdx() const;
+
+ /// Get the remaining contiguous bytes for the current range.
+ uint64_t getRemnantSize() const;
+
+ /** Advance @a size bytes in the range spec.
+
+ @return The resulting offset in the object.
+ */
+ uint64_t consume(uint64_t size);
protected:
- uint64_t _offset; ///< Current offset into the range.
- int _idx; ///< Current range index. (< 0 -> not in a range)
+ uint64_t _len; ///< Total object length.
+ uint64_t _offset; ///< Offset in content.
+ int _idx; ///< Current range index. (< 0 means not in a range)
};
// CacheVC
@@ -384,6 +407,13 @@ struct CacheVC: public CacheVConnection
@return Length of header data used for alternates.
*/
virtual uint32_t load_http_info(CacheHTTPInfoVector* info, struct Doc* doc, RefCountObj * block_ptr = NULL);
+
+ /// Change member @a key to be the key for the @a target 'th fragment.
+ void update_key_to_frag_idx(int target);
+ /// Compute the index of the fragment that contains the byte at content @a offset.
+ /// The table of @a frags and its @a count must be provided.
+ int frag_idx_for_offset(HTTPInfo::FragOffset* frags, int count, uint64_t offset);
+
#endif
virtual bool is_pread_capable();
virtual bool set_pin_in_cache(time_t time_pin);
@@ -473,6 +503,11 @@ struct CacheVC: public CacheVConnection
uint64_t total_len; // total length written and available to write
uint64_t doc_len; // total_length (of the selected alternate for HTTP)
uint64_t update_len;
+ /// The offset in the content of the first byte beyond the end of the current fragment.
+ /// @internal This seems very weird but I couldn't figure out how to keep the more sensible
+ /// lower bound correctly updated.
+ /// The lower bound can can computed by subtracting doc->len from this value.
+ uint64_t frag_upper_bound;
int fragment;
int scan_msec_delay;
CacheVC *write_vc;
@@ -1384,6 +1419,29 @@ local_cache()
return theCache;
}
+inline uint64_t
+CacheRange::getOffset() const
+{
+ return _offset;
+}
+
+inline int
+CacheRange::getIdx() const
+{
+ return _idx;
+}
+
+inline uint64_t
+CacheRange::getRemnantSize() const
+{
+ uint64_t zret = 0;
+ if (this->isEmpty()) zret = _len - _offset;
+ else if (this->isSingle()) zret = (_single._max - _offset) + 1;
+ else if (this->isMulti() && 0 <= _idx && _idx < static_cast<int>(_ranges.size()))
+ zret = (_ranges[_idx]._max - _offset) + 1;
+ return zret;
+}
+
LINK_DEFINITION(CacheVC, opendir_link)
#endif /* _P_CACHE_INTERNAL_H__ */