You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by am...@apache.org on 2015/04/19 19:45:18 UTC

[4/8] trafficserver git commit: First correct range processing (single range).

First correct range processing (single range).


Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo
Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/d33b6255
Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/d33b6255
Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/d33b6255

Branch: refs/heads/ts-974-multi-range-read
Commit: d33b6255f031801ed176b3fb77caea09b6640b37
Parents: ee70849
Author: Alan M. Carroll <so...@yahoo-inc.com>
Authored: Mon Dec 1 11:04:56 2014 -0600
Committer: Alan M. Carroll <so...@yahoo-inc.com>
Committed: Sat Dec 6 11:56:03 2014 -0600

----------------------------------------------------------------------
 iocore/cache/Cache.cc          |   5 ++
 iocore/cache/CacheHttp.cc      |  40 ++++++++++
 iocore/cache/CacheRead.cc      | 152 ++++++++++++++++++++----------------
 iocore/cache/I_CacheDefs.h     |  29 ++++++-
 iocore/cache/P_CacheInternal.h |  68 ++++++++++++++--
 5 files changed, 220 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/trafficserver/blob/d33b6255/iocore/cache/Cache.cc
----------------------------------------------------------------------
diff --git a/iocore/cache/Cache.cc b/iocore/cache/Cache.cc
index 9aece93..de20dc9 100644
--- a/iocore/cache/Cache.cc
+++ b/iocore/cache/Cache.cc
@@ -2751,6 +2751,11 @@ LinterimRead:
   io.action = this;
   io.thread = mutex->thread_holding->tt == DEDICATED ? AIO_CALLBACK_THREAD_ANY : mutex->thread_holding;
   SET_HANDLER(&CacheVC::handleReadDone);
+  {
+    char xt[33];
+    Debug("amc", "cache read : key = %s %" PRId64 " bytes at stripe offset =% " PRId64
+          , key.toHexStr(xt), io.aiocb.aio_nbytes, io.aiocb.aio_offset);
+  }
   ink_assert(ink_aio_read(&io) >= 0);
   CACHE_DEBUG_INCREMENT_DYN_STAT(cache_pread_count_stat);
   return EVENT_CONT;

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/d33b6255/iocore/cache/CacheHttp.cc
----------------------------------------------------------------------
diff --git a/iocore/cache/CacheHttp.cc b/iocore/cache/CacheHttp.cc
index 024f5ca..3b86a9b 100644
--- a/iocore/cache/CacheHttp.cc
+++ b/iocore/cache/CacheHttp.cc
@@ -374,6 +374,46 @@ RangeSpec::finalize(uint64_t len)
 /*-------------------------------------------------------------------------
   -------------------------------------------------------------------------*/
 
+bool
+CacheRange::finalize(uint64_t len)
+{
+  bool zret = super::finalize(len);
+  if (zret) {
+    if (this->isEmpty()) { // pretend it's one range [0..len)
+      _offset = 0;
+    } else {
+      _idx = 0;
+      _offset = _single._min;
+    }
+    _len = len;
+  }
+  return zret;
+}
+
+uint64_t
+CacheRange::consume(uint64_t size)
+{
+  switch (_state) {
+  case EMPTY: _offset += size; break;
+  case SINGLE: _offset += std::min(size, (_single._max - _offset) + 1 ); break;
+  case MULTI:
+    while (size && _idx < static_cast<int>(_ranges.size())) {
+      uint64_t r = std::min(size, (_ranges[_idx]._max - _offset) + 1);
+      _offset += r;
+      size -= r;
+      if (_offset > _ranges[_idx]._max)
+        _offset = _ranges[++_idx]._min;
+    }
+    break;
+  default: break;
+  }
+
+  return _offset;
+}
+
+/*-------------------------------------------------------------------------
+  -------------------------------------------------------------------------*/
+
 #else //HTTP_CACHE
 
 CacheHTTPInfoVector::CacheHTTPInfoVector()

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/d33b6255/iocore/cache/CacheRead.cc
----------------------------------------------------------------------
diff --git a/iocore/cache/CacheRead.cc b/iocore/cache/CacheRead.cc
index c2caab7..f263d82 100644
--- a/iocore/cache/CacheRead.cc
+++ b/iocore/cache/CacheRead.cc
@@ -634,13 +634,55 @@ Ldone:
 Lcallreturn:
   return handleEvent(AIO_EVENT_DONE, 0);
 LreadMain:
-  fragment++;
+  ++fragment;
   doc_pos = doc->prefix_len();
+  if (req_rs.isValid()) {
+    doc_pos += req_rs.getOffset() - frag_upper_bound; // used before update!
+  }
+  frag_upper_bound += doc->data_len();
   next_CacheKey(&key, &key);
   SET_HANDLER(&CacheVC::openReadMain);
   return openReadMain(event, e);
 }
 
+void
+CacheVC::update_key_to_frag_idx(int target)
+{
+  if (target < fragment) { // going backwards
+    if (target < (fragment - target)) { // faster to go from start
+      fragment = 0;
+      key = earliest_key;
+    } else { // quicker to go from here to there
+      while (target < fragment) {
+        prev_CacheKey(&key, &key);
+        --fragment;
+      }
+    }
+  }
+  // advance to target if we're not already there.
+  while (target > fragment) {
+    next_CacheKey(&key, &key);
+    ++fragment;
+  }
+}
+
+int
+CacheVC::frag_idx_for_offset(HTTPInfo::FragOffset* frags, int count, uint64_t offset)
+{
+  int idx = count / 2;
+  ink_assert(offset < doc_len);
+  do {
+    uint64_t upper = idx >= count ? doc_len : frags[idx];
+    uint64_t lower = idx <= 0 ? 0 : frags[idx-1];
+    if (offset < lower) idx = idx / 2;
+    else if (offset >= upper) idx = (count + idx + 1)/2;
+    else break;
+  } while (true);
+  return idx;
+}
+
+/* There is a fragment available, decide what do to next.
+ */
 int
 CacheVC::openReadMain(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
 {
@@ -649,78 +691,41 @@ CacheVC::openReadMain(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
   int64_t ntodo = vio.ntodo();
   int64_t bytes = doc->len - doc_pos;
   IOBufferBlock *b = NULL;
-  if (seek_to) { // handle do_io_pread
-    if (seek_to >= doc_len) {
-      vio.ndone = doc_len;
-      return calluser(VC_EVENT_EOS);
-    }
 #ifdef HTTP_CACHE
-    HTTPInfo::FragOffset* frags = alternate.get_frag_table();
-    if (is_debug_tag_set("cache_seek")) {
-      char b[33], c[33];
-      Debug("cache_seek", "Seek @ %" PRId64" in %s from #%d @ %" PRId64"/%d:%s",
-            seek_to, first_key.toHexStr(b), fragment, doc_pos, doc->len, doc->key.toHexStr(c));
-    }
-    /* Because single fragment objects can migrate to hang off an alt vector
-       they can appear to the VC as multi-fragment when they are not really.
-       The essential difference is the existence of a fragment table.
-    */
-    if (frags) {
-      int target = 0;
-      HTTPInfo::FragOffset next_off = frags[target];
-      int lfi = static_cast<int>(alternate.get_frag_offset_count()) - 1;
-      ink_assert(lfi >= 0); // because it's not a single frag doc.
+  if (req_rs.isValid()) {
+    int target = -1; // target fragment index.
+    uint64_t target_offset = req_rs.getOffset();
+    uint64_t lower_bound = frag_upper_bound - doc->data_len();
 
-      /* Note: frag[i].offset is the offset of the first byte past the
-         i'th fragment. So frag[0].offset is the offset of the first
-         byte of fragment 1. In addition the # of fragments is one
-         more than the fragment table length, the start of the last
-         fragment being the last offset in the table.
-      */
-      if (fragment == 0 ||
-          seek_to < frags[fragment-1] ||
-          (fragment <= lfi && frags[fragment] <= seek_to)
-        ) {
-        // search from frag 0 on to find the proper frag
-        while (seek_to >= next_off && target < lfi) {
-          next_off = frags[++target];
-        }
-        if (target == lfi && seek_to >= next_off) ++target;
-      } else { // shortcut if we are in the fragment already
-        target = fragment;
+    if (target_offset < lower_bound || frag_upper_bound <= target_offset) {
+      HTTPInfo::FragOffset* frags = alternate.get_frag_table();
+
+      if (is_debug_tag_set("amc")) {
+        char b[33], c[33];
+        Debug("amc", "Seek @ %" PRIu64 " [r#=%d] in %s from #%d @ %" PRIu64 "/%d/%" PRId64 ":%s%s",
+              target_offset, req_rs.getIdx(), first_key.toHexStr(b), fragment, frag_upper_bound, doc->len, doc->total_len, doc->key.toHexStr(c)
+              , (frags ? "" : "no frag table")
+          );
       }
-      if (target != fragment) {
-        // Lread will read the next fragment always, so if that
-        // is the one we want, we don't need to do anything
-        int cfi = fragment;
-        --target;
-        while (target > fragment) {
-          next_CacheKey(&key, &key);
-          ++fragment;
-        }
-        while (target < fragment) {
-          prev_CacheKey(&key, &key);
-          --fragment;
-        }
 
-        if (is_debug_tag_set("cache_seek")) {
-          char target_key_str[33];
-          key.toHexStr(target_key_str);
-          Debug("cache_seek", "Seek #%d @ %" PRId64" -> #%d @ %" PRId64":%s", cfi, doc_pos, target, seek_to, target_key_str);
-        }
-        goto Lread;
+      if (frags) {
+        int n_frags = alternate.get_frag_offset_count();
+        target = this->frag_idx_for_offset(frags, n_frags, target_offset);
+        this->update_key_to_frag_idx(target);
+        /// one frag short, because it gets bumped when the fragment is actually read.
+        frag_upper_bound = target > 0 ? frags[target-1] : 0;
+      } else { // we don't support non-monotonic forward requests on objects with no frag table.
+        ink_release_assert(target_offset >= frag_upper_bound);
       }
-    }
-    doc_pos = doc->prefix_len() + seek_to;
-    if (fragment) doc_pos -= static_cast<int64_t>(frags[fragment-1]);
-    vio.ndone = 0;
-    seek_to = 0;
-    ntodo = vio.ntodo();
-    bytes = doc->len - doc_pos;
-    if (is_debug_tag_set("cache_seek")) {
-      char target_key_str[33];
-      key.toHexStr(target_key_str);
-      Debug("cache_seek", "Read # %d @ %" PRId64"/%d for %" PRId64, fragment, doc_pos, doc->len, bytes);
+      goto Lread;
+    } else {
+      // Adjust fragment starting offset to account for target offset.
+      int r_doc_pos = doc->prefix_len() + (target_offset - lower_bound);
+      if (doc_pos < r_doc_pos) {
+        doc_pos = r_doc_pos;
+        bytes = doc->len - doc_pos;
+      }
+      bytes = std::min(bytes, static_cast<int64_t>(req_rs.getRemnantSize()));
     }
 #endif
   }
@@ -730,6 +735,7 @@ CacheVC::openReadMain(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
     return EVENT_CONT;
   if ((bytes <= 0) && vio.ntodo() >= 0)
     goto Lread;
+  // Do the write to downstream consumers.
   if (bytes > vio.ntodo())
     bytes = vio.ntodo();
   b = new_IOBufferBlock(buf, bytes, doc_pos);
@@ -737,6 +743,7 @@ CacheVC::openReadMain(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
   vio.buffer.writer()->append_block(b);
   vio.ndone += bytes;
   doc_pos += bytes;
+  req_rs.consume(bytes);
   if (vio.ntodo() <= 0)
     return calluser(VC_EVENT_READ_COMPLETE);
   else {
@@ -857,6 +864,7 @@ CacheVC::openReadStartEarliest(int /* event ATS_UNUSED */, Event * /* e ATS_UNUS
     earliest_key = key;
     doc_pos = doc->prefix_len();
     next_CacheKey(&key, &doc->key);
+    fragment = 1;
     vol->begin_read(this);
     if (vol->within_hit_evacuate_window(&earliest_dir) &&
         (!cache_config_hit_evacuate_size_limit || doc_len <= (uint64_t)cache_config_hit_evacuate_size_limit)
@@ -1016,6 +1024,10 @@ Lrestart:
 /*
   This code follows CacheVC::openReadStartEarliest closely,
   if you change this you might have to change that.
+
+  This handles the I/O completion of reading the first doc of the object.
+  If there are alternates, we chain to openreadStartEarliest to read the
+  earliest doc.
 */
 int
 CacheVC::openReadStartHead(int event, Event * e)
@@ -1144,6 +1156,8 @@ CacheVC::openReadStartHead(int event, Event * e)
         ink_assert(doc->hlen);
         doc_pos = doc->prefix_len();
         next_CacheKey(&key, &doc->key);
+        fragment = 1;
+        frag_upper_bound = doc->data_len();
       } else {
         f.single_fragment = false;
       }
@@ -1151,6 +1165,8 @@ CacheVC::openReadStartHead(int event, Event * e)
 #endif
     {
       next_CacheKey(&key, &doc->key);
+      fragment = 1;
+      frag_upper_bound = doc->data_len();
       f.single_fragment = doc->single_fragment();
       doc_pos = doc->prefix_len();
       doc_len = doc->total_len;

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/d33b6255/iocore/cache/I_CacheDefs.h
----------------------------------------------------------------------
diff --git a/iocore/cache/I_CacheDefs.h b/iocore/cache/I_CacheDefs.h
index df430f4..ce50958 100644
--- a/iocore/cache/I_CacheDefs.h
+++ b/iocore/cache/I_CacheDefs.h
@@ -203,9 +203,18 @@ struct RangeSpec {
   */
   size_t count() const;
 
-  /// If this is a valid, single range specification.
+  /// If this is a valid  single range specification.
   bool isSingle() const;
 
+  /// If this is a valid multi range specification.
+  bool isMulti() const;
+
+  /// Test if this contains at least one valid range.
+  bool isValid() const;
+
+  /// Test if this is a valid but empty range spec.
+  bool isEmpty() const;
+
 protected:
   self& add(uint64_t low, uint64_t high);
 };
@@ -221,12 +230,30 @@ RangeSpec::isSingle() const
   return SINGLE == _state;
 }
 
+inline bool
+RangeSpec::isMulti() const
+{
+  return MULTI == _state;
+}
+
+inline bool
+RangeSpec::isEmpty() const
+{
+  return EMPTY == _state;
+}
+
 inline size_t
 RangeSpec::count() const
 {
   return SINGLE == _state ? 1 : _ranges.size();
 }
 
+inline bool
+RangeSpec::isValid() const
+{
+  return SINGLE == _state || MULTI == _state;
+}
+
 inline RangeSpec::Range&
 RangeSpec::Range::invalidate()
 {

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/d33b6255/iocore/cache/P_CacheInternal.h
----------------------------------------------------------------------
diff --git a/iocore/cache/P_CacheInternal.h b/iocore/cache/P_CacheInternal.h
index 5a0b8fc..2de18a3 100644
--- a/iocore/cache/P_CacheInternal.h
+++ b/iocore/cache/P_CacheInternal.h
@@ -235,20 +235,43 @@ extern int good_interim_disks;
 
 /** Range operation tracking.
 
-    This holds a range specification and tracks where in the range the current
-    cache operation is.
+    This holds a range specification. It also tracks the current object offset and the individual range.
+
+    For simplification of the logic that uses this class it will pretend to be a single range of
+    the object size if it is empty. To return the correct response we still need to distinuish
+    those two cases.
 */
 class CacheRange : public RangeSpec
 {
  public:
   typedef CacheRange self; ///< Self reference type.
+  typedef RangeSpec super; ///< Parent type.
 
   /// Default constructor
-  CacheRange() : _offset(0), _idx(-1) { }
+ CacheRange() : super(), _offset(0), _idx(-1) { }
+
+  /// Convert to specific values based on content @a length.
+  bool finalize(uint64_t length);
+
+  /// Get the current object offset
+  uint64_t getOffset() const;
+
+  /// Get the current range index.
+  int getIdx() const;
+
+  /// Get the remaining contiguous bytes for the current range.
+  uint64_t getRemnantSize() const;
+
+  /** Advance @a size bytes in the range spec.
+
+      @return The resulting offset in the object.
+  */
+  uint64_t consume(uint64_t size);
 
  protected:
-  uint64_t _offset; ///< Current offset into the range.
-  int _idx; ///< Current range index. (< 0 -> not in a range)
+  uint64_t _len; ///< Total object length.
+  uint64_t _offset; ///< Offset in content.
+  int _idx; ///< Current range index. (< 0 means not in a range)
 };
 
 // CacheVC
@@ -384,6 +407,13 @@ struct CacheVC: public CacheVConnection
       @return Length of header data used for alternates.
    */
   virtual uint32_t load_http_info(CacheHTTPInfoVector* info, struct Doc* doc, RefCountObj * block_ptr = NULL);
+
+  /// Change member @a key to be the key for the @a target 'th fragment.
+  void update_key_to_frag_idx(int target);
+  /// Compute the index of the fragment that contains the byte at content @a offset.
+  /// The table of @a frags and its @a count must be provided.
+  int frag_idx_for_offset(HTTPInfo::FragOffset* frags, int count, uint64_t offset);
+
 #endif
   virtual bool is_pread_capable();
   virtual bool set_pin_in_cache(time_t time_pin);
@@ -473,6 +503,11 @@ struct CacheVC: public CacheVConnection
   uint64_t total_len;             // total length written and available to write
   uint64_t doc_len;               // total_length (of the selected alternate for HTTP)
   uint64_t update_len;
+  /// The offset in the content of the first byte beyond the end of the current fragment.
+  /// @internal This seems very weird but I couldn't figure out how to keep the more sensible
+  /// lower bound correctly updated.
+  /// The lower bound can can computed by subtracting doc->len from this value.
+  uint64_t frag_upper_bound;
   int fragment;
   int scan_msec_delay;
   CacheVC *write_vc;
@@ -1384,6 +1419,29 @@ local_cache()
   return theCache;
 }
 
+inline uint64_t
+CacheRange::getOffset() const
+{
+  return _offset;
+}
+
+inline int
+CacheRange::getIdx() const
+{
+  return _idx;
+}
+
+inline uint64_t
+CacheRange::getRemnantSize() const
+{
+  uint64_t zret = 0;
+  if (this->isEmpty()) zret = _len - _offset;
+  else if (this->isSingle()) zret = (_single._max - _offset) + 1;
+  else if (this->isMulti() &&  0 <= _idx && _idx < static_cast<int>(_ranges.size()))
+    zret = (_ranges[_idx]._max - _offset) + 1;
+  return zret;
+}
+
 LINK_DEFINITION(CacheVC, opendir_link)
 
 #endif /* _P_CACHE_INTERNAL_H__ */