You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by jp...@apache.org on 2009/12/18 17:48:16 UTC

svn commit: r892310 [2/4] - in /incubator/trafficserver/traffic/branches/dev: ./ iocore/aio/ iocore/cache/ iocore/cluster/ iocore/eventsystem/ iocore/net/ libinktomi++/ proxy/ proxy/http2/

Modified: incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheRead.cc
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheRead.cc?rev=892310&r1=892309&r2=892310&view=diff
==============================================================================
--- incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheRead.cc (original)
+++ incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheRead.cc Fri Dec 18 16:47:37 2009
@@ -29,60 +29,59 @@
 Action *
 Cache::open_read(Continuation * cont, CacheKey * key, CacheFragType type, char *hostname, int host_len)
 {
-
   if (!(CacheProcessor::cache_ready & type)) {
     cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *) -ECACHE_NOT_READY);
     return ACTION_RESULT_DONE;
   }
-
   ink_assert(caches[type] == this);
 
   Part *part = key_to_part(key, hostname, host_len);
   Dir result, *last_collision = NULL;
   ProxyMutex *mutex = cont->mutex;
-
-  CacheVC *c = new_CacheVC(cont);
-  SET_CONTINUATION_HANDLER(c, &CacheVC::openReadStartHead);
-  c->vio.op = VIO::READ;
-  c->base_stat = cache_read_active_stat;
-  CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
-  c->first_key = c->key = c->earliest_key = *key;
-  c->part = part;
-  if (type == CACHE_FRAG_TYPE_HTTP) {
-    c->f.http_request = 1;
-  }
-
-  CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
-  if (lock) {
-    c->od = part->open_read(key);
-    if (c->od) {
-      // someone is currently writing the document 
-      SET_CONTINUATION_HANDLER(c, &CacheVC::openReadFromWriter);
-      if (c->handleEvent(EVENT_IMMEDIATE, 0) == EVENT_CONT)
-        return &c->_action;
-      else
-        return ACTION_RESULT_DONE;
+  OpenDirEntry *od = NULL;
+  CacheVC *c = NULL;
+  {
+    CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
+    if (!lock || (od = part->open_read(key)) || dir_probe(key, part, &result, &last_collision)) {
+      c = new_CacheVC(cont);
+      SET_CONTINUATION_HANDLER(c, &CacheVC::openReadStartHead);
+      c->vio.op = VIO::READ;
+      c->base_stat = cache_read_active_stat;
+      CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
+      c->first_key = c->key = c->earliest_key = *key;
+      c->part = part;
+      c->frag_type = type;
+      c->od = od;
     }
-    // no writer
-    if (!dir_probe(key, part, &result, &last_collision)) {
-      //release the lock before calling handleEvent??
-      CACHE_INCREMENT_DYN_STAT(cache_read_failure_stat);
-      cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *) -ECACHE_NO_DOC);
-      free_CacheVC(c);
-      return ACTION_RESULT_DONE;
+    if (!lock) {
+      CONT_SCHED_LOCK_RETRY(c);
+      return &c->_action;
     }
-
+    if (!c)
+      goto Lmiss;
+    if (c->od)
+      goto Lwriter;
     c->dir = result;
     c->last_collision = last_collision;
+    switch(c->do_read_call(&c->key)) {
+      case EVENT_DONE: return ACTION_RESULT_DONE;
+      case EVENT_RETURN: goto Lcallreturn;
+      default: return &c->_action;
+    }
   }
-  if (!lock) {
-    CONT_SCHED_LOCK_RETRY(c);
-    return &c->_action;
-  }
-  if (c->do_read(&c->key) == EVENT_CONT)
-    return &c->_action;
-  else
-    return ACTION_RESULT_DONE;  // ram cache hit
+Lmiss:
+  CACHE_INCREMENT_DYN_STAT(cache_read_failure_stat);
+  cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *) -ECACHE_NO_DOC);
+  return ACTION_RESULT_DONE;
+Lwriter:
+  SET_CONTINUATION_HANDLER(c, &CacheVC::openReadFromWriter);
+  if (c->handleEvent(EVENT_IMMEDIATE, 0) == EVENT_DONE)
+    return ACTION_RESULT_DONE;
+  return &c->_action;
+Lcallreturn:
+  if (c->handleEvent(AIO_EVENT_DONE, 0) == EVENT_DONE)
+    return ACTION_RESULT_DONE;
+  return &c->_action;
 }
 
 #ifdef HTTP_CACHE
@@ -95,59 +94,62 @@
     cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *) -ECACHE_NOT_READY);
     return ACTION_RESULT_DONE;
   }
-
   ink_assert(caches[type] == this);
 
   Part *part = key_to_part(key, hostname, host_len);
   Dir result, *last_collision = NULL;
   ProxyMutex *mutex = cont->mutex;
+  OpenDirEntry *od = NULL;
+  CacheVC *c = NULL;
 
-  CacheVC *c = new_CacheVC(cont);
-  c->first_key = c->key = c->earliest_key = *key;
-  c->part = part;
-  c->vio.op = VIO::READ;
-  c->base_stat = cache_read_active_stat;
-  CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
-  c->request.copy_shallow(request);
-  c->f.http_request = 1;
-  c->params = params;
-
-  CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
-  if (lock) {
-    c->od = part->open_read(key);
-    if (c->od) {
-      //Type-casting the continuation to HttpCacheSM object
-      HttpCacheSM *cachesm = (HttpCacheSM *) cont;
-      //Setting the read_while_write_inprogress flag 
-      cachesm->set_readwhilewrite_inprogress(true);
-
-      // someone is currently writing the document 
-      SET_CONTINUATION_HANDLER(c, &CacheVC::openReadFromWriter);
-      if (c->handleEvent(EVENT_IMMEDIATE, 0) == EVENT_CONT)
-        return &c->_action;
-      else
-        return ACTION_RESULT_DONE;
-    }
-    if (!dir_probe(key, part, &result, &last_collision)) {
-      //release the lock before calling handleEvent???
-      CACHE_INCREMENT_DYN_STAT(cache_read_failure_stat);
-      cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *) -ECACHE_NO_DOC);
-      free_CacheVC(c);
-      return ACTION_RESULT_DONE;
+  {
+    CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
+    if (!lock || (od = part->open_read(key)) || dir_probe(key, part, &result, &last_collision)) {
+      c = new_CacheVC(cont);
+      c->first_key = c->key = c->earliest_key = *key;
+      c->part = part;
+      c->vio.op = VIO::READ;
+      c->base_stat = cache_read_active_stat;
+      CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
+      c->request.copy_shallow(request);
+      c->frag_type = CACHE_FRAG_TYPE_HTTP;
+      c->params = params;
+      c->od = od;
     }
+    if (!lock) {
+      SET_CONTINUATION_HANDLER(c, &CacheVC::openReadStartHead);
+      CONT_SCHED_LOCK_RETRY(c);
+      return &c->_action;
+    }
+    if (!c)
+      goto Lmiss;
+    if (c->od)
+      goto Lwriter;
+    // hit
     c->dir = c->first_dir = result;
     c->last_collision = last_collision;
-
+    SET_CONTINUATION_HANDLER(c, &CacheVC::openReadStartHead);
+    switch(c->do_read_call(&c->key)) {
+      case EVENT_DONE: return ACTION_RESULT_DONE;
+      case EVENT_RETURN: goto Lcallreturn;
+      default: return &c->_action;
+    }
   }
-  SET_CONTINUATION_HANDLER(c, &CacheVC::openReadStartHead);
-  if (!lock) {
-    CONT_SCHED_LOCK_RETRY(c);
-    return &c->_action;
-  }
-  if (c->do_read(&c->key) == EVENT_CONT)
-    return &c->_action;
-  else
-    return ACTION_RESULT_DONE;  // ram cache hit
+Lmiss:
+  CACHE_INCREMENT_DYN_STAT(cache_read_failure_stat);
+  cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *) -ECACHE_NO_DOC);
+  return ACTION_RESULT_DONE;
+Lwriter:
+  // this is a horrible violation of the interface and should be fixed (FIXME)
+  ((HttpCacheSM *)cont)->set_readwhilewrite_inprogress(true);
+  SET_CONTINUATION_HANDLER(c, &CacheVC::openReadFromWriter);
+  if (c->handleEvent(EVENT_IMMEDIATE, 0) == EVENT_DONE)
+    return ACTION_RESULT_DONE;
+  return &c->_action;
+Lcallreturn:
+  if (c->handleEvent(AIO_EVENT_DONE, 0) == EVENT_DONE)
+    return ACTION_RESULT_DONE;
+  return &c->_action;
 }
 #endif
 
@@ -173,7 +175,7 @@
   int err = ECACHE_DOC_BUSY;
   CacheVC *w = NULL;
 
-  if (!f.http_request) {
+  if (frag_type != CACHE_FRAG_TYPE_HTTP) {
     ink_assert(od->num_writers == 1);
     w = od->writers.head;
     if (w->start_time > start_time || w->closed < 0) {
@@ -254,7 +256,7 @@
     }
     vector.clear(false);
     if (!write_vc) {
-      Debug("cache_read_agg", "%x: key: %X writer alternate different: %d", this, first_key.word(1), alternate_index);
+      Debug("cache_read_agg", "%x: key: %X %X writer alternate different: %d", this, first_key.word(1), alternate_index);
       od = NULL;
       SET_HANDLER(&CacheVC::openReadStartHead);
       return openReadStartHead(EVENT_IMMEDIATE, 0);
@@ -293,10 +295,8 @@
   if (_action.cancelled)
     return free_CacheVC(this);
   CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
-  if (!lock) {
-    ink_assert(!od);
+  if (!lock)
     VC_SCHED_LOCK_RETRY();
-  }
   if (!od) {
     od = part->open_read(&first_key);
     if (!od) {
@@ -306,7 +306,6 @@
     }
   } else
     ink_debug_assert(od == part->open_read(&first_key));
-
   if (!write_vc) {
     int ret = openReadChooseWriter(event, e);
     if (ret == EVENT_DONE || !write_vc)
@@ -331,13 +330,13 @@
     return openReadStartHead(EVENT_IMMEDIATE, 0);
   }
   // allow reading from unclosed writer for http requests only. 
-  ink_assert(f.http_request || write_vc->closed);
-  if (!write_vc->closed && !write_vc->segment) {
-    if (!cache_config_read_while_writer || !f.http_request)
+  ink_assert(frag_type == CACHE_FRAG_TYPE_HTTP || write_vc->closed);
+  if (!write_vc->closed && !write_vc->fragment) {
+    if (!cache_config_read_while_writer || frag_type != CACHE_FRAG_TYPE_HTTP)
       return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, (Event *) - err);
     Debug("cache_read_agg",
-          "%x: key: %X writer: closed:%d, segment:%d, retry: %d",
-          this, first_key.word(1), write_vc->closed, write_vc->segment, writer_lock_retry);
+          "%x: key: %X writer: closed:%d, fragment:%d, retry: %d",
+          this, first_key.word(1), write_vc->closed, write_vc->fragment, writer_lock_retry);
     VC_SCHED_WRITER_RETRY();
   }
 
@@ -350,10 +349,10 @@
   if (!write_vc->io.ok())
     return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, (Event *) - err);
 #ifdef HTTP_CACHE
-  if (f.http_request) {
+  if (frag_type == CACHE_FRAG_TYPE_HTTP) {
     Debug("cache_read_agg",
-          "%x: key: %X http passed stage 1, closed: %d, seg: %d",
-          this, first_key.word(1), write_vc->closed, write_vc->segment);
+          "%x: key: %X http passed stage 1, closed: %d, frag: %d",
+          this, first_key.word(1), write_vc->closed, write_vc->fragment);
     if (!write_vc->alternate.valid())
       return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, (Event *) - err);
     alternate.copy(&write_vc->alternate);
@@ -378,11 +377,11 @@
         // header only update. The first_buf of the writer has the
         // document body. 
         Doc *doc = (Doc *) write_vc->first_buf->data();
-        writer_buf = new_IOBufferBlock(write_vc->first_buf, doc->data_len(), sizeofDoc + doc->hlen);
+        writer_buf = new_IOBufferBlock(write_vc->first_buf, doc->data_len(), doc->prefix_len());
         MUTEX_RELEASE(writer_lock);
         ink_assert(doc_len == doc->data_len());
         length = doc_len;
-        f.single_segment = 1;
+        f.single_fragment = 1;
         docpos = 0;
         earliest_key = key;
         dir_clean(&first_dir);
@@ -406,12 +405,12 @@
 #ifdef HTTP_CACHE
   }
 #endif
-  if (write_vc->segment) {
+  if (write_vc->fragment) {
     doc_len = write_vc->vio.nbytes;
     last_collision = NULL;
     Debug("cache_read_agg",
-          "%x: key: %X closed: %d, segment: %d, len: %d starting first segment",
-          this, first_key.word(1), write_vc->closed, write_vc->segment, doc_len);
+          "%x: key: %X closed: %d, fragment: %d, len: %d starting first fragment",
+          this, first_key.word(1), write_vc->closed, write_vc->fragment, (int)doc_len);
     MUTEX_RELEASE(writer_lock);
     // either a header + body update or a new document
     SET_HANDLER(&CacheVC::openReadStartEarliest);
@@ -421,14 +420,14 @@
   writer_offset = write_vc->offset;
   length = write_vc->length;
   //copy the vector
-  f.single_segment = !write_vc->segment;        //single segment doc
+  f.single_fragment = !write_vc->fragment;        //single fragment doc
   docpos = 0;
   earliest_key = write_vc->earliest_key;
   ink_assert(earliest_key == key);
   doc_len = write_vc->total_len;
   dir_clean(&first_dir);
   dir_clean(&earliest_dir);
-  Debug("cache_read_agg", "%x key: %X %X: single segment read", first_key.word(1), key.word(0));
+  Debug("cache_read_agg", "%x key: %X %X: single fragment read", first_key.word(1), key.word(0));
 
   MUTEX_RELEASE(writer_lock);
   //we've got everything....ready to roll!!
@@ -446,20 +445,20 @@
   NOWARN_UNUSED(e);
   NOWARN_UNUSED(event);
 
-  IOBufferBlock *b = NULL;
-  int ntodo = vio.ntodo();
   cancel_trigger();
+  if (seek_to) {
+    vio.ndone = seek_to;
+    seek_to = 0;
+  }
+  IOBufferBlock *b = NULL;
+  ink64 ntodo = vio.ntodo();
   if (ntodo <= 0)
     return EVENT_CONT;
-  if (vio.buffer.mbuf->max_read_avail() > vio.buffer.writer()->water_mark && vio.ndone) // initiate read of first block
-    return EVENT_CONT;
-
   if (length < (doc_len - vio.ndone)) {
     Debug("cache_read_agg", "truncation %X", earliest_key.word(0));
     if (is_action_tag_set("cache")) {
       ink_release_assert(false);
     }
-//    action_tag_assert("cache", false);
     Warning("Document for %X truncated", earliest_key.word(0));
     calluser(VC_EVENT_ERROR);
     return EVENT_CONT;
@@ -467,22 +466,20 @@
   /* its possible that the user did a do_io_close before 
      openWriteWriteDone was called. */
   if (length > (doc_len - vio.ndone)) {
-    int skip_bytes = length - (doc_len - vio.ndone);
-    iobufferblock_skip(writer_buf, &writer_offset, &length, skip_bytes);
+    ink64 skip_bytes = length - (doc_len - vio.ndone);
+    iobufferblock_skip(writer_buf, &writer_offset, (int*)&length, skip_bytes);
   }
-  int bytes = length;
-  if (length > vio.ntodo())
+  ink64 bytes = length;
+  if (bytes > vio.ntodo())
     bytes = vio.ntodo();
-
-  if (vio.ndone >= doc_len) {
+  if (vio.ndone >= (ink64)doc_len) {
     ink_assert(bytes <= 0);
     // reached the end of the document and the user still wants more 
     calluser(VC_EVENT_EOS);
     return EVENT_DONE;
   }
   b = iobufferblock_clone(writer_buf, writer_offset, bytes);
-  writer_buf = iobufferblock_skip(writer_buf, &writer_offset, &length, bytes);
-
+  writer_buf = iobufferblock_skip(writer_buf, &writer_offset, (int*)&length, bytes);
   vio.buffer.mbuf->append_block(b);
   vio.ndone += bytes;
   if (vio.ntodo() <= 0) {
@@ -496,7 +493,6 @@
   }
 }
 
-
 int
 CacheVC::openReadClose(int event, Event * e)
 {
@@ -509,12 +505,12 @@
       return EVENT_CONT;
     set_io_not_in_progress();
   }
-  MUTEX_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
+  CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
   if (!lock)
     VC_SCHED_LOCK_RETRY();
 #ifdef HIT_EVACUATE
   if (f.hit_evacuate && dir_valid(part, &first_dir) && closed > 0) {
-    if (f.single_segment)
+    if (f.single_fragment)
       part->force_evacuate_head(&first_dir, dir_pinned(&first_dir));
     else if (dir_valid(part, &earliest_dir)) {
       part->force_evacuate_head(&first_dir, dir_pinned(&first_dir));
@@ -536,7 +532,7 @@
     return EVENT_CONT;
   set_io_not_in_progress();
   if (event == AIO_EVENT_DONE && !io.ok())
-    goto Ldone;
+    goto Lerror;
   if (last_collision &&         // no missed lock
       dir_valid(part, &dir))    // object still valid
   {
@@ -547,11 +543,11 @@
         Warning("Middle: Doc checksum does not match for %s", key.string(tmpstring));
       else
         Warning("Middle: Doc magic does not match for %s", key.string(tmpstring));
-      goto Ldone;
+      goto Lerror;
     }
-
     if (doc->key == key) {
-      docpos = sizeofDoc + doc->hlen;
+      fragment++;
+      docpos = doc->prefix_len();
       next_CacheKey(&key, &key);
       SET_HANDLER(&CacheVC::openReadMain);
       return openReadMain(event, e);
@@ -561,42 +557,48 @@
     CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
     if (!lock)
       VC_SCHED_LOCK_RETRY();
-
     if (last_collision && dir_offset(&dir) != dir_offset(last_collision))
       last_collision = 0;       // object has been/is being overwritten
     if (dir_probe(&key, part, &dir, &last_collision)) {
-      do_read(&key);
+      int ret = do_read_call(&key);
+      if (ret == EVENT_RETURN)
+        goto Lcallreturn;
       return EVENT_CONT;
     } else if (write_vc) {
       if (writer_done()) {
         last_collision = NULL;
         while (dir_probe(&earliest_key, part, &dir, &last_collision)) {
           if (dir_offset(&dir) == dir_offset(&earliest_dir)) {
-            Debug("cache_read_agg", "%x: key: %X ReadRead complete: %d", this, first_key.word(1), vio.ndone);
+            Debug("cache_read_agg", "%x: key: %X ReadRead complete: %d", 
+                  this, first_key.word(1), (int)vio.ndone);
             doc_len = vio.ndone;
+            MUTEX_RELEASE(lock);
             calluser(VC_EVENT_EOS);
             return EVENT_DONE;
           }
         }
-        Debug("cache_read_agg", "%x: key: %X ReadRead writer aborted: %d", this, first_key.word(1), vio.ndone);
+        Debug("cache_read_agg", "%x: key: %X ReadRead writer aborted: %d", 
+              this, first_key.word(1), (int)vio.ndone);
+        MUTEX_RELEASE(lock);
         calluser(VC_EVENT_ERROR);
         return EVENT_DONE;
       }
-      Debug("cache_read_agg", "%x: key: %X ReadRead retrying: %d", this, first_key.word(1), vio.ndone);
+      Debug("cache_read_agg", "%x: key: %X ReadRead retrying: %d", this, first_key.word(1), (int)vio.ndone);
       VC_SCHED_WRITER_RETRY();
     }
   }
-Ldone:
-//  action_tag_assert("cache", false);
-  if (is_action_tag_set("cache")) {
+Lerror:
+  if (is_action_tag_set("cache"))
     ink_release_assert(false);
-  }
   // remove the directory entry
   dir_delete_lock(&earliest_key, part, mutex, &earliest_dir);
   char tmpstring[100];
   Warning("Document truncated for %s", earliest_key.string(tmpstring));
   calluser(VC_EVENT_ERROR);
   return EVENT_CONT;
+Lcallreturn:
+  handleEvent(AIO_EVENT_DONE, 0);
+  return EVENT_CONT;
 }
 
 int
@@ -605,18 +607,43 @@
   NOWARN_UNUSED(e);
   NOWARN_UNUSED(event);
 
-  IOBufferBlock *b = NULL;
-  int ntodo = vio.ntodo();
   cancel_trigger();
+  Doc *doc = (Doc *) buf->data();
+  ink64 ntodo = vio.ntodo();
+  ink64 bytes = doc->len - docpos;
+  IOBufferBlock *b = NULL;
+  if (seek_to) { // handle do_io_pread
+    if (seek_to >= doc_len) {
+      vio.ndone = doc_len;
+      calluser(VC_EVENT_EOS);
+      return EVENT_DONE;
+    }
+    if (f.single_fragment) {
+      vio.ndone = seek_to;
+      seek_to = 0;
+    } else {
+      if (seek_to <  doc->data_len()) {
+        vio.ndone += seek_to;
+        seek_to = 0;
+      } else {
+        Frag *frag = doc->frags();
+        // skip to correct key, key is already set to next fragment
+        for (inku32 i = 1; i <= doc->nfrags(); i++) {
+          if (seek_to < frag[i].offset) break;
+          next_CacheKey(&key, &key);
+        }
+        seek_to = 0;
+        vio.ndone = seek_to;
+        goto Lread;
+      }
+    }
+  }
   if (ntodo <= 0)
     return EVENT_CONT;
   if (vio.buffer.mbuf->max_read_avail() > vio.buffer.writer()->water_mark && vio.ndone) // initiate read of first block
     return EVENT_CONT;
-  Doc *doc = (Doc *) buf->data();
-  int bytes = doc->len - docpos;
-  if ((bytes <= 0) && vio.ntodo() >= 0) {
+  if ((bytes <= 0) && vio.ntodo() >= 0)
     goto Lread;
-  }
   if (bytes > vio.ntodo())
     bytes = vio.ntodo();
   b = new_IOBufferBlock(buf, bytes, docpos);
@@ -636,8 +663,8 @@
       goto Lread;
     return EVENT_CONT;
   }
-Lread:{
-    if (vio.ndone >= doc_len) {
+Lread: {
+    if ((inku32)vio.ndone >= doc_len) {
       // reached the end of the document and the user still wants more 
       calluser(VC_EVENT_EOS);
       return EVENT_DONE;
@@ -656,50 +683,54 @@
     }
     if (dir_probe(&key, part, &dir, &last_collision)) {
       SET_HANDLER(&CacheVC::openReadReadDone);
-      do_read(&key);
+      int ret = do_read_call(&key);
+      if (ret == EVENT_RETURN)
+        goto Lcallreturn;
       return EVENT_CONT;
     } else if (write_vc) {
       if (writer_done()) {
         last_collision = NULL;
         while (dir_probe(&earliest_key, part, &dir, &last_collision)) {
           if (dir_offset(&dir) == dir_offset(&earliest_dir)) {
-            Debug("cache_read_agg", "%x: key: %X ReadMain complete: %d", this, first_key.word(1), vio.ndone);
+            Debug("cache_read_agg", "%x: key: %X ReadMain complete: %d", 
+                  this, first_key.word(1), (int)vio.ndone);
             doc_len = vio.ndone;
+            MUTEX_RELEASE(lock);
             calluser(VC_EVENT_EOS);
             return EVENT_DONE;
           }
         }
-        Debug("cache_read_agg", "%x: key: %X ReadMain writer aborted: %d", this, first_key.word(1), vio.ndone);
-        calluser(VC_EVENT_ERROR);
-        return EVENT_DONE;
+        Debug("cache_read_agg", "%x: key: %X ReadMain writer aborted: %d", 
+              this, first_key.word(1), (int)vio.ndone);
+        goto Lerror;
       }
-      Debug("cache_read_agg", "%x: key: %X ReadMain retrying: %d", this, first_key.word(1), vio.ndone);
+      Debug("cache_read_agg", "%x: key: %X ReadMain retrying: %d", this, first_key.word(1), (int)vio.ndone);
       SET_HANDLER(&CacheVC::openReadReadDone);
       VC_SCHED_WRITER_RETRY();
     }
+    Debug("cache_evac", "truncation %X", key.word(0));
+    if (is_action_tag_set("cache"))
+      ink_release_assert(false);
+    Warning("Document for %X truncated", earliest_key.word(0));
+    // remove the directory entry
+    dir_delete(&earliest_key, part, &earliest_dir);
   }
-  Debug("cache_evac", "truncation %X", key.word(0));
-//  action_tag_assert("cache", false);
-  if (is_action_tag_set("cache")) {
-    ink_release_assert(false);
-  }
-  Warning("Document for %X truncated", earliest_key.word(0));
-  // remove the directory entry
-  dir_delete_lock(&earliest_key, part, mutex, &earliest_dir);
+Lerror:
   calluser(VC_EVENT_ERROR);
+  return EVENT_DONE;
+Lcallreturn:
+  handleEvent(AIO_EVENT_DONE, 0);
   return EVENT_CONT;
 }
 
-
 int
 CacheVC::openReadStartEarliest(int event, Event * e)
 {
   NOWARN_UNUSED(e);
   NOWARN_UNUSED(event);
 
-  int err = ECACHE_NO_DOC;
+  int ret = 0;
   Doc *doc = NULL;
-
   cancel_trigger();
   set_io_not_in_progress();
   if (_action.cancelled)
@@ -723,7 +754,6 @@
     if (is_action_tag_set("cache")) {
       ink_release_assert(false);
     }
-
     if (doc->magic == DOC_CORRUPT)
       Warning("Earliest: Doc checksum does not match for %s", key.string(tmpstring));
     else
@@ -739,13 +769,12 @@
   }
   if (!(doc->key == key))
     goto Lcollision;
+  if (part->begin_read_lock(this) < 0)
+    VC_SCHED_LOCK_RETRY();
   // success
   earliest_key = key;
-  docpos = sizeofDoc + doc->hlen;
+  docpos = doc->prefix_len();
   next_CacheKey(&key, &doc->key);
-  if (part->begin_read_lock(this) < 0)
-    VC_SCHED_LOCK_RETRY();
-
 #ifdef HIT_EVACUATE
   if (part->within_hit_evacuate_window(&earliest_dir) &&
       (!cache_config_hit_evacuate_size_limit || doc_len <= cache_config_hit_evacuate_size_limit)) {
@@ -756,27 +785,28 @@
 #endif
   if (write_vc)
     CACHE_INCREMENT_DYN_STAT(cache_read_busy_success_stat);
-
   SET_HANDLER(&CacheVC::openReadMain);
   _action.continuation->handleEvent(CACHE_EVENT_OPEN_READ, (void *) this);
   return EVENT_DONE;
 
-Lcollision:{
+Lcollision: {
     CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
     if (!lock)
       VC_SCHED_LOCK_RETRY();
     if (dir_probe(&key, part, &earliest_dir, &last_collision) || dir_lookaside_probe(&key, part, &earliest_dir, NULL)) {
       dir = earliest_dir;
-      return do_read(&key);
+      if ((ret = do_read_call(&key)) == EVENT_RETURN)
+        goto Lcallreturn;
+      return ret;
     }
     // read has detected that alternate does not exist in the cache. 
     // rewrite the vector.
 #ifdef HTTP_CACHE
-    if (!f.read_from_writer_called && f.http_request) {
+    if (!f.read_from_writer_called && frag_type == CACHE_FRAG_TYPE_HTTP) {
       // don't want any writers while we are evacuating the vector
       if (!part->open_write(this, false, 1)) {
         Doc *doc1 = (Doc *) first_buf->data();
-        int len = write_vector->get_handles(doc1->hdr, doc1->hlen);
+        inku32 len = write_vector->get_handles(doc1->hdr(), doc1->hlen);
         ink_assert(len == doc1->hlen && write_vector->count() > 0);
         write_vector->remove(alternate_index, true);
         // if the vector had one alternate, delete it's directory entry
@@ -813,10 +843,12 @@
             od->move_resident_alt = 1;
             od->single_doc_key = doc1->key;
             dir_assign(&od->single_doc_dir, &dir);
-            dir_set_tag(&od->single_doc_dir, od->single_doc_key.word(1));
+            dir_set_tag(&od->single_doc_dir, od->single_doc_key.word(2));
           }
           SET_HANDLER(&CacheVC::openReadVecWrite);
-          return do_write();
+          if ((ret = do_write_call()) == EVENT_RETURN)
+            goto Lcallreturn;
+          return ret;
         }
       }
     }
@@ -827,8 +859,10 @@
   if (od)
     part->close_write_lock(this);
   CACHE_INCREMENT_DYN_STAT(cache_read_failure_stat);
-  _action.continuation->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *) -err);
+  _action.continuation->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *) -ECACHE_NO_DOC);
   return free_CacheVC(this);
+Lcallreturn:
+  return handleEvent(AIO_EVENT_DONE, 0); // hopefully a tail call
 }
 
 // create the directory entry after the vector has been evacuated
@@ -851,7 +885,7 @@
 
   if (io.ok()) {
     ink_assert(f.evac_vector);
-    ink_assert(f.http_request);
+    ink_assert(frag_type == CACHE_FRAG_TYPE_HTTP);
     ink_assert(!buf.m_ptr);
     f.evac_vector = false;
     last_collision = NULL;
@@ -890,6 +924,9 @@
 int
 CacheVC::openReadStartHead(int event, Event * e)
 {
+  NOWARN_UNUSED(e);
+  NOWARN_UNUSED(event);
+
   int err = ECACHE_NO_DOC;
   Doc *doc = NULL;
   cancel_trigger();
@@ -919,7 +956,7 @@
       Warning("Head: Doc checksum does not match for %s", key.string(tmpstring));
     else
       Warning("Head : Doc magic does not match for %s", key.string(tmpstring));
-    //remove the directory entry
+    // remove the dir entry
     dir_delete(&key, part, &dir);
     // try going through the directory entries again
     // in case the dir entry we deleted doesnt correspond
@@ -930,8 +967,6 @@
   }
   if (!(doc->first_key == key))
     goto Lcollision;
-
-
   if (f.lookup) {
     CACHE_INCREMENT_DYN_STAT(cache_lookup_success_stat);
     _action.continuation->handleEvent(CACHE_EVENT_LOOKUP, 0);
@@ -940,11 +975,11 @@
   earliest_dir = dir;
 #ifdef HTTP_CACHE
   CacheHTTPInfo *alternate_tmp;
-  if (f.http_request) {
+  if (frag_type == CACHE_FRAG_TYPE_HTTP) {
     ink_assert(doc->hlen);
     if (!doc->hlen)
       goto Ldone;
-    if (vector.get_handles(doc->hdr, doc->hlen) != doc->hlen) {
+    if (vector.get_handles(doc->hdr(), doc->hlen) != doc->hlen) {
       if (buf) {
         Note("OpenReadHead failed for cachekey %X : vector inconsistency with %d", key.word(0), doc->hlen);
         dir_delete(&key, part, &dir);
@@ -977,31 +1012,31 @@
     alternate.object_key_get(&key);
     doc_len = alternate.object_size_get();
     if (key == doc->key) {      // is this my data?
-      f.single_segment = doc->single_segment();
-      ink_assert(f.single_segment);     // otherwise need to read earliest
+      f.single_fragment = doc->single_fragment();
+      ink_assert(f.single_fragment);     // otherwise need to read earliest
       ink_assert(doc->hlen);
-      docpos = sizeofDoc + doc->hlen;
+      docpos = doc->prefix_len();
       next_CacheKey(&key, &doc->key);
     } else {
-      f.single_segment = false;
+      f.single_fragment = false;
     }
   } else
 #endif
   {
-    // non-http docs have the total len set in the first segment
+    // non-http docs have the total len set in the first fragment
     if (doc->hlen) {
       ink_debug_assert(!"Cache::openReadStartHead non-http request" " for http doc");
       err = -ECACHE_BAD_READ_REQUEST;
       goto Ldone;
     }
     next_CacheKey(&key, &doc->key);
-    f.single_segment = doc->single_segment();
-    docpos = sizeofDoc + doc->hlen;
+    f.single_fragment = doc->single_fragment();
+    docpos = doc->prefix_len();
     doc_len = doc->total_len;
   }
   // the first fragment might have been gc'ed. Make sure the first
   // fragment is there before returning CACHE_EVENT_OPEN_READ
-  if (!f.single_segment) {
+  if (!f.single_fragment) {
     first_buf = buf;
     buf = NULL;
     earliest_key = key;
@@ -1021,7 +1056,7 @@
   first_buf = buf;
   if (part->begin_read_lock(this) < 0)
     VC_SCHED_LOCK_RETRY();
-  // success
+
   SET_HANDLER(&CacheVC::openReadMain);
   _action.continuation->handleEvent(CACHE_EVENT_OPEN_READ, (void *) this);
   return EVENT_DONE;
@@ -1042,12 +1077,16 @@
         goto Ldone;
       }
       od = cod;
+      MUTEX_RELEASE(lock);
       SET_HANDLER(&CacheVC::openReadFromWriter);
       return handleEvent(EVENT_IMMEDIATE, 0);
     }
     if (dir_probe(&key, part, &dir, &last_collision)) {
       first_dir = dir;
-      return do_read(&key);
+      int ret = do_read_call(&key);
+      if (ret == EVENT_RETURN)
+        goto Lcallreturn;
+      return ret;
     }
   }
 Ldone:
@@ -1059,4 +1098,6 @@
     _action.continuation->handleEvent(CACHE_EVENT_LOOKUP_FAILED, (void *) -err);
   }
   return free_CacheVC(this);
+Lcallreturn:
+  return handleEvent(AIO_EVENT_DONE, 0); // hopefully a tail call
 }

Modified: incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheWrite.cc
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheWrite.cc?rev=892310&r1=892309&r2=892310&view=diff
==============================================================================
--- incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheWrite.cc (original)
+++ incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheWrite.cc Fri Dec 18 16:47:37 2009
@@ -28,13 +28,20 @@
 #define HDR_PTR_SIZE            (sizeof(inku64))
 #define HDR_PTR_ALIGNMENT_MASK  ((HDR_PTR_SIZE) - 1L)
 
-#define CACHE_AGGREGATE_WRITE 1
-#define MAX_AGG_LEN (256*1024)
-
-extern int cache_config_max_agg_delay;
-extern int cache_config_check_disk_idle;
 extern int cache_config_agg_write_backlog;
 
+static inline int power_of_2(inku32 x) {
+  while (x) {
+    if (x & 1) {
+      if (x >> 1) 
+        return 0;
+      return 1;
+    }
+    x >>= 1; 
+  }
+  return 1;
+}
+
 // Given a key, finds the index of the alternate which matches
 // used to get the alternate which is actually present in the document
 #ifdef HTTP_CACHE
@@ -70,79 +77,82 @@
   cancel_trigger();
   if (od->reading_vec || od->writing_vec)
     VC_SCHED_LOCK_RETRY();
+  int ret = 0;
+  {
+    CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
+    if (!lock || od->writing_vec)
+      VC_SCHED_LOCK_RETRY();
 
-  CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
-  if (!lock || od->writing_vec)
-    VC_SCHED_LOCK_RETRY();
-
-  int vec = alternate.valid();
-  if (f.update) {
-    // all Update cases. Need to get the alternate index.
-    alternate_index = get_alternate_index(write_vector, update_key);
-    Debug("cache_update", "updating alternate index %d", alternate_index);
-    // if its an alternate delete
-    if (!vec) {
-      ink_assert(!total_len);
-      if (alternate_index >= 0) {
-        write_vector->remove(alternate_index, true);
-        alternate_index = CACHE_ALT_REMOVED;
-        if (!write_vector->count())
-          dir_delete(&first_key, part, &od->first_dir);
-      }
-      // the alternate is not there any more. somebody might have
-      // deleted it. Just close this writer
-      if (alternate_index != CACHE_ALT_REMOVED || !write_vector->count()) {
-        SET_HANDLER(&CacheVC::openWriteCloseDir);
-        return openWriteCloseDir(EVENT_IMMEDIATE, 0);
+    int vec = alternate.valid();
+    if (f.update) {
+      // all Update cases. Need to get the alternate index.
+      alternate_index = get_alternate_index(write_vector, update_key);
+      Debug("cache_update", "updating alternate index %d", alternate_index);
+      // if its an alternate delete
+      if (!vec) {
+        ink_assert(!total_len);
+        if (alternate_index >= 0) {
+          write_vector->remove(alternate_index, true);
+          alternate_index = CACHE_ALT_REMOVED;
+          if (!write_vector->count())
+            dir_delete(&first_key, part, &od->first_dir);
+        }
+        // the alternate is not there any more. somebody might have
+        // deleted it. Just close this writer
+        if (alternate_index != CACHE_ALT_REMOVED || !write_vector->count()) {
+          SET_HANDLER(&CacheVC::openWriteCloseDir);
+          return openWriteCloseDir(EVENT_IMMEDIATE, 0);
+        }
       }
+      if (update_key == od->single_doc_key && (total_len || !vec))
+        od->move_resident_alt = 0;
     }
-    if (update_key == od->single_doc_key && (total_len || !vec))
-      od->move_resident_alt = 0;
-  }
-  if (cache_config_http_max_alts > 1 && write_vector->count() >= cache_config_http_max_alts && alternate_index < 0) {
-
-    if (od->move_resident_alt && get_alternate_index(write_vector, od->single_doc_key) == 0)
-      od->move_resident_alt = 0;
+    if (cache_config_http_max_alts > 1 && write_vector->count() >= cache_config_http_max_alts && alternate_index < 0) {
+      if (od->move_resident_alt && get_alternate_index(write_vector, od->single_doc_key) == 0)
+        od->move_resident_alt = 0;
+      write_vector->remove(0, true);
+    }
+    if (vec)
+      alternate_index = write_vector->insert(&alternate, alternate_index);
 
-    write_vector->remove(0, true);
-  }
-  if (vec)
-    alternate_index = write_vector->insert(&alternate, alternate_index);
-
-  if (od->move_resident_alt && first_buf._ptr() && !od->has_multiple_writers()) {
-    Doc *doc = (Doc *) first_buf->data();
-    int small_doc = doc->data_len() < cache_config_alt_rewrite_max_size;
-    int have_res_alt = doc->key == od->single_doc_key;
-    // if the new alternate is not written with the vector
-    // then move the old one with the vector
-    // if its a header only update move the resident alternate
-    // with the vector.
-    // We are sure that the body of the resident alternate that we are
-    // rewriting has not changed and the alternate is not being deleted,
-    // since we set od->move_resident_alt  to 0 in that case
-    // (in updateVector)
-    if (small_doc && have_res_alt && (segment || (f.update && !total_len))) {
-      // for multiple segment document, we must have done
-      // CacheVC:openWriteCloseDataDone
-      ink_assert(!segment || f.data_done);
-      od->move_resident_alt = 0;
-      f.rewrite_resident_alt = 1;
-      write_len = doc->data_len();
-      Debug("cache_update_alt",
-            "rewriting resident alt size: %d key: %X, first_key: %X", write_len, doc->key.word(0), first_key.word(0));
+    if (od->move_resident_alt && first_buf._ptr() && !od->has_multiple_writers()) {
+      Doc *doc = (Doc *) first_buf->data();
+      int small_doc = (ink64)doc->data_len() < (ink64)cache_config_alt_rewrite_max_size;
+      int have_res_alt = doc->key == od->single_doc_key;
+      // if the new alternate is not written with the vector
+      // then move the old one with the vector
+      // if its a header only update move the resident alternate
+      // with the vector.
+      // We are sure that the body of the resident alternate that we are
+      // rewriting has not changed and the alternate is not being deleted,
+      // since we set od->move_resident_alt  to 0 in that case
+      // (in updateVector)
+      if (small_doc && have_res_alt && (fragment || (f.update && !total_len))) {
+        // for multiple fragment document, we must have done
+        // CacheVC:openWriteCloseDataDone
+        ink_assert(!fragment || f.data_done);
+        od->move_resident_alt = 0;
+        f.rewrite_resident_alt = 1;
+        write_len = doc->data_len();
+        Debug("cache_update_alt",
+              "rewriting resident alt size: %d key: %X, first_key: %X", write_len, doc->key.word(0), first_key.word(0));
+      }
     }
+    od->writing_vec = 1;
+    f.use_first_key = 1;
+    SET_HANDLER(&CacheVC::openWriteCloseHeadDone);
+    ret = do_write_call();
   }
-  od->writing_vec = 1;
-  f.use_first_key = 1;
-  SET_HANDLER(&CacheVC::openWriteCloseHeadDone);
-  return do_write();
+  if (ret == EVENT_RETURN)
+    return handleEvent(AIO_EVENT_DONE, 0);
+  return ret;
 }
 #endif
 /*
    The following fields of the CacheVC are used when writing down a fragment.
    Make sure that each of the fields is set to a valid value before calling
    this function
-   - f.http_request. Checked to see if a vector needs to be marshalled.
+   - frag_type. Checked to see if a vector needs to be marshalled.
    - f.use_first_key. To decide if the vector should be marshalled and to set
      the doc->key to the appropriate key (first_key or earliest_key)
    - f.evac_vector. If set, the writer is pushed in the beginning of the 
@@ -154,7 +164,7 @@
    - f.rewrite_resident_alt. The resident alternate is rewritten.
    - f.update. Used only if the write_vector needs to be written to disk. 
      Used to set the length of the alternate to total_len.
-   - write_vector. Used only if f.http_request && 
+   - write_vector. Used only if frag_type == CACHE_FRAG_TYPE_HTTP && 
      (f.use_fist_key || f.evac_vector) is set. Write_vector is written to disk
    - alternate_index. Used only if write_vector needs to be written to disk.
      Used to find out the VC's alternate in the write_vector and set its 
@@ -182,20 +192,25 @@
   // plain write case
   ink_assert(!trigger);
 #ifdef HTTP_CACHE
-  if (f.http_request && (f.use_first_key || f.evac_vector)) {
+  if (frag_type == CACHE_FRAG_TYPE_HTTP && (f.use_first_key || f.evac_vector)) {
     ink_assert(od->writing_vec);
-    vec_len = write_vector->marshal_length();
-    ink_assert(vec_len > 0);
+    header_len = write_vector->marshal_length();
+    ink_assert(header_len > 0);
   } else
 #endif
-    vec_len = 0;
+    header_len = header_to_write_len;
+  if (f.use_first_key && fragment) {
+    frag_len = (fragment-1) * sizeof(Frag);
+  } else
+    frag_len = 0;
   set_agg_write_in_progress();
   POP_HANDLER;
-  agg_len = round_to_approx_size(write_len + vec_len + sizeofDoc);
+  agg_len = round_to_approx_size(write_len + header_len + frag_len + sizeofDoc);
   part->agg_todo_size += agg_len;
   ink_assert(agg_len <= AGG_SIZE);
-  bool agg_error = (agg_len > AGG_SIZE ||
-                    (!f.readers && (part->agg_todo_size > cache_config_agg_write_backlog + AGG_SIZE) && write_len));
+  bool agg_error = 
+    (agg_len > AGG_SIZE ||
+     (!f.readers && (part->agg_todo_size > cache_config_agg_write_backlog + AGG_SIZE) && write_len));
 #ifdef CACHE_AGG_FAIL_RATE
   agg_error = agg_error || ((inku32) mutex->thread_holding->generator.random() <
                             (inku32) (UINT_MAX * CACHE_AGG_FAIL_RATE));
@@ -215,10 +230,8 @@
     part->agg.push(this);
   else
     part->agg.enqueue(this);
-
-  if (!part->is_io_in_progress()) {
-    part->aggWrite(EVENT_IMMEDIATE, 0);
-  }
+  if (!part->is_io_in_progress())
+    return part->aggWrite(event, this);
   return EVENT_CONT;
 }
 
@@ -279,7 +292,7 @@
     // we can't evacuate anything between header->write_pos and
     // header->write_pos + AGG_SIZE.
     int ps = offset_to_part_offset(this, header->write_pos + AGG_SIZE);
-    int pe = offset_to_part_offset(this, header->write_pos + 2 * EVAC_SIZE + (len / PIN_SCAN_EVERY));
+    int pe = offset_to_part_offset(this, header->write_pos + 2 * EVACUATION_SIZE + (len / PIN_SCAN_EVERY));
     int part_end_offset = offset_to_part_offset(this, len + skip);
     int before_end_of_part = pe < part_end_offset;
     Debug("cache_evac", "scan %d %d", ps, pe);
@@ -303,63 +316,20 @@
   }
 }
 
-int
-PartCallback::aggWriteDone(int event, Event * e)
-{
-  NOWARN_UNUSED(e);
-  NOWARN_UNUSED(event);
-
-  if (trigger) {
-    trigger->cancel_action();
-    trigger = NULL;
-  }
-
-  CacheVC *c;
-  Queue<CacheVC> not_done;
-  EThread *t = mutex->thread_holding;
-  while ((c = (CacheVC *) write_done.dequeue())) {
-    if (!c->mutex->is_thread() || c->mutex->thread_holding == t) {
-      CACHE_TRY_LOCK(lock, c->mutex, t);
-      if (!lock) {
-        not_done.enqueue(c);
-        continue;
-      }
-      c->handleEvent(AIO_EVENT_DONE, 0);
-    } else {
-      not_done.enqueue(c);
-      continue;
-    }
-  }
-  if (not_done.head) {
-    write_done = not_done;
-    if (write_done.head->mutex->is_thread() && write_done.head->mutex->thread_holding != t)
-      trigger = write_done.head->mutex->thread_holding->schedule_imm(this);
-    else
-      trigger = eventProcessor.schedule_in(this, MUTEX_RETRY_DELAY);
-    return EVENT_CONT;
-  }
-  return EVENT_DONE;
-}
-
 /* NOTE:: This state can be called by an AIO thread, so DON'T DON'T
    DON'T schedule any events on this thread using VC_SCHED_XXX or
    mutex->thread_holding->schedule_xxx_local(). ALWAYS use 
    eventProcessor.schedule_xxx(). 
    */
 int
-Part::aggWriteDone(int event, Event * e)
+Part::aggWriteDone(int event, Event *e)
 {
   NOWARN_UNUSED(e);
   NOWARN_UNUSED(event);
 
   cancel_trigger();
-  ProxyMutex *dir_sync_lock;
-  if (dir_sync_waiting)
-    dir_sync_lock = cacheDirSync->mutex;
-  else
-    dir_sync_lock = mutex;
 
-  CACHE_TRY_LOCK(lock, dir_sync_lock, mutex->thread_holding);
+  CACHE_TRY_LOCK(lock, dir_sync_waiting ? cacheDirSync->mutex : mutex, mutex->thread_holding);
   if (!lock) {
     // INKqa10347
     // Race condition between cacheDirSync and the part when setting the
@@ -367,22 +337,25 @@
     eventProcessor.schedule_in(this, MUTEX_RETRY_DELAY);
     return EVENT_CONT;
   }
-
   if (io.ok()) {
     header->last_write_pos = header->write_pos;
     header->write_pos += io.aiocb.aio_nbytes;
-    Debug("cache_agg", "Dir %s, Write: %llu, last Write: %llu\n", hash_id, header->write_pos, header->last_write_pos);
+    ink_assert(header->write_pos >= start);
+    Debug("cache_agg", "Dir %s, Write: %llu, last Write: %llu\n", 
+          hash_id, header->write_pos, header->last_write_pos);
     ink_assert(header->write_pos == header->agg_pos);
-    if (header->write_pos + EVAC_SIZE > scan_pos)
+    if (header->write_pos + EVACUATION_SIZE > scan_pos)
       periodic_scan();
-
     agg_buf_pos = 0;
     header->write_serial++;
   } else {
     // delete all the directory entries that we inserted
     // for fragments is this aggregation buffer
     Debug("cache_disk_error", "Write error on disk %s\n \
-              write range : [%llu - %llu bytes]  [%llu - %llu blocks] \n", hash_id, io.aiocb.aio_offset, io.aiocb.aio_offset + io.aiocb.aio_nbytes, io.aiocb.aio_offset / 512, (io.aiocb.aio_offset + io.aiocb.aio_nbytes) / 512);
+              write range : [%llu - %llu bytes]  [%llu - %llu blocks] \n", 
+          hash_id, io.aiocb.aio_offset, io.aiocb.aio_offset + io.aiocb.aio_nbytes, 
+          io.aiocb.aio_offset / INK_BLOCK_SIZE, 
+          (io.aiocb.aio_offset + io.aiocb.aio_nbytes) / INK_BLOCK_SIZE);
     Dir del_dir;
     dir_clear(&del_dir);
     for (int done = 0; done < agg_buf_pos;) {
@@ -393,31 +366,25 @@
     }
     agg_buf_pos = 0;
   }
-
   set_io_not_in_progress();
   SET_HANDLER(&Part::aggWrite);
-
   if (dir_sync_waiting) {
     dir_sync_waiting = 0;
     cacheDirSync->handleEvent(EVENT_IMMEDIATE, 0);
   }
-
   if (agg.head)
-    return aggWrite(EVENT_IMMEDIATE, e);
+    return aggWrite(event, e);
   return EVENT_CONT;
 }
 
-extern volatile int cachewrite_buf_data;
-
 CacheVC *
 new_DocEvacuator(int nbytes, Part * part)
 {
   CacheVC *c = new_CacheVC(part);
   ProxyMutex *mutex = part->mutex;
   c->base_stat = cache_evacuate_active_stat;
-
   CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
-  c->buf = new_IOBufferData(iobuffer_size_to_index(nbytes), MEMALIGNED);
+  c->buf = new_IOBufferData(iobuffer_size_to_index(nbytes, MAX_BUFFER_SIZE_INDEX), MEMALIGNED);
   c->part = part;
   c->f.evacuator = 1;
   c->earliest_key = zero_key;
@@ -449,9 +416,9 @@
     goto Lcollision;
 #ifdef HTTP_CACHE
   alternate_tmp = 0;
-  if (doc->hlen) {
-    //its an http document
-    if (vector.get_handles(doc->hdr, doc->hlen) != doc->hlen) {
+  if (doc->ftype == CACHE_FRAG_TYPE_HTTP && doc->hlen) {
+    // its an http document
+    if (vector.get_handles(doc->hdr(), doc->hlen) != doc->hlen) {
       Note("bad vector detected during evacuation");
       goto Ldone;
     }
@@ -481,11 +448,13 @@
     return free_CacheVC(this);
   }
   return EVENT_CONT;
-
 Lcollision:
-  if (dir_probe(&first_key, part, &dir, &last_collision))
-    return do_read(&first_key);
-
+  if (dir_probe(&first_key, part, &dir, &last_collision)) {
+    int ret = do_read_call(&first_key);
+    if (ret == EVENT_RETURN)
+      return handleEvent(AIO_EVENT_DONE, 0);
+    return ret;
+  }
 Ldone:
   dir_lookaside_remove(&earliest_key, part);
   return free_CacheVC(this);
@@ -508,10 +477,10 @@
   for (; b; b = b->link.next) {
     if (dir_offset(&b->dir) == dir_offset(&overwrite_dir)) {
 
-      // If the document is single segment (although not tied to the vector),
+      // If the document is single fragment (although not tied to the vector),
       // then we don't have to put the directory entry in the lookaside
       // buffer. But, we have no way of finding out if the document is
-      // single segment. doc->single_segment() can be true for a multiple
+      // single fragment. doc->single_fragment() can be true for a multiple
       // fragment document since total_len and doc->len could be equal at
       // the time we write the fragment down. To be on the safe side, we
       // only overwrite the entry in the directory if its not a head.
@@ -575,7 +544,10 @@
             dir_lookaside_insert(b, part, &earliest_dir);
             // read the vector
             SET_HANDLER(&CacheVC::evacuateReadHead);
-            return do_read(&first_key);
+            int ret = do_read_call(&first_key);
+            if (ret == EVENT_RETURN)
+              return handleEvent(AIO_EVENT_DONE, 0);
+            return ret;
           }
         }
       }
@@ -586,7 +558,7 @@
 }
 
 int
-evacuate_segments(CacheKey * key, CacheKey * earliest_key, int force, Part * part)
+evacuate_fragments(CacheKey * key, CacheKey * earliest_key, int force, Part * part)
 {
   Dir dir, *last_collision = 0;
   int i = 0;
@@ -615,7 +587,7 @@
     if (force)
       b->f.readers = 0;
     Debug("cache_evac",
-          "next segment %X Earliest: %X offset %d phase %d force %d",
+          "next fragment %X Earliest: %X offset %d phase %d force %d",
           (int) key->word(0), (int) earliest_key->word(0), (int) dir_offset(&dir), (int) dir_phase(&dir), force);
   }
   return i;
@@ -639,7 +611,6 @@
   ink_assert(evacuator->agg_len <= AGG_SIZE);
   agg.insert(evacuator, after);
   return aggWrite(event, e);
-
 }
 
 int
@@ -648,9 +619,8 @@
   NOWARN_UNUSED(e);
 
   cancel_trigger();
-  if (event != AIO_EVENT_DONE) {
+  if (event != AIO_EVENT_DONE)
     return EVENT_DONE;
-  }
   ink_assert(is_io_in_progress());
   set_io_not_in_progress();
   ink_debug_assert(mutex->thread_holding == this_ethread());
@@ -717,11 +687,12 @@
   // Cache::open_write).
   if (!dir_head(&b->dir) || !dir_compare_tag(&b->dir, &doc->first_key)) {
     next_CacheKey(&next_key, &doc->key);
-    evacuate_segments(&next_key, &doc_evacuator->earliest_key, !b->f.readers, this);
+    evacuate_fragments(&next_key, &doc_evacuator->earliest_key, !b->f.readers, this);
   }
   return evacuateWrite(doc_evacuator, event, e);
 Ldone:
   free_CacheVC(doc_evacuator);
+  doc_evacuator = 0;
   return aggWrite(event, e);
 }
 
@@ -733,7 +704,6 @@
   int si = dir_offset_evac_bucket(s);
   int ei = dir_offset_evac_bucket(e);
 
-  //  Debug("cache_evac", "evac_range %d %d %d %d", si, ei, s, e);
   for (int i = si; i <= ei; i++) {
     EvacuationBlock *b = evacuate[i].head;
     EvacuationBlock *first = 0;
@@ -741,8 +711,6 @@
     for (; b; b = b->link.next) {
       int offset = dir_offset(&b->dir);
       int phase = dir_phase(&b->dir);
-      //      Debug("cache_evac", "evac_range test %X %d %d",
-      //      b->key.word(0), offset, b->f.done);
       if (offset >= s && offset < e && !b->f.done && phase == evac_phase)
         if (offset < first_offset) {
           first = b;
@@ -782,9 +750,9 @@
     Doc *doc = (Doc *) p;
     IOBufferBlock *res_alt_blk = 0;
 
-    int seglen = vc->write_len + vc->vec_len + sizeofDoc;
-    ink_assert(!vc->f.http_request || seglen != sizeofDoc);
-    int writelen = round_to_approx_size(seglen);
+    inku32 len = vc->write_len + vc->header_len + vc->frag_len + sizeofDoc;
+    ink_assert(vc->frag_type != CACHE_FRAG_TYPE_HTTP || len != sizeofDoc);
+    int writelen = round_to_approx_size(len);
     // update copy of directory entry for this document
     dir_set_approx_size(&vc->dir, writelen);
     dir_set_offset(&vc->dir, offset_to_part_offset(part, o));
@@ -793,8 +761,10 @@
 
     // fill in document header
     doc->magic = DOC_MAGIC;
-    doc->len = seglen;
-    doc->hlen = vc->vec_len;
+    doc->len = len;
+    doc->hlen = vc->header_len;
+    doc->ftype = vc->frag_type;
+    doc->flen = vc->frag_len;
     doc->total_len = vc->total_len;
     doc->first_key = vc->first_key;
     doc->sync_serial = part->header->sync_serial;
@@ -818,7 +788,7 @@
       dir_set_head(&vc->dir, true);
     } else {
       doc->key = vc->key;
-      dir_set_head(&vc->dir, !vc->segment);
+      dir_set_head(&vc->dir, !vc->fragment);
     }
 
 #ifdef HTTP_CACHE
@@ -831,28 +801,31 @@
     }
 #endif
     // update the new_info object_key, and total_len and dirinfo
-#ifdef HTTP_CACHE
-    if (vc->vec_len) {
+    if (vc->header_len) {
       ink_debug_assert(vc->f.use_first_key);
-      ink_debug_assert(vc->write_vector->count() > 0);
-      if (!vc->f.update && !vc->f.evac_vector) {
-        ink_debug_assert(!(vc->first_key == zero_key));
-        CacheHTTPInfo *http_info = vc->write_vector->get(vc->alternate_index);
-        http_info->object_size_set(vc->total_len);
-      }
-      // update + data_written =>  Update case (b)
-      // need to change the old alternate's object length
-      if (vc->f.update && vc->total_len) {
-        CacheHTTPInfo *http_info = vc->write_vector->get(vc->alternate_index);
-        http_info->object_size_set(vc->total_len);
-      }
-      ink_assert(!(((unsigned long) &doc->hdr[0]) & HDR_PTR_ALIGNMENT_MASK));
-      ink_assert(vc->vec_len == vc->write_vector->marshal(&doc->hdr[0], vc->vec_len));
-      // the single segment flag is not used in the write call.
+#ifdef HTTP_CACHE
+      if (vc->frag_type == CACHE_FRAG_TYPE_HTTP) {
+        ink_debug_assert(vc->write_vector->count() > 0);
+        if (!vc->f.update && !vc->f.evac_vector) {
+          ink_debug_assert(!(vc->first_key == zero_key));
+          CacheHTTPInfo *http_info = vc->write_vector->get(vc->alternate_index);
+          http_info->object_size_set(vc->total_len);
+        }
+        // update + data_written =>  Update case (b)
+        // need to change the old alternate's object length
+        if (vc->f.update && vc->total_len) {
+          CacheHTTPInfo *http_info = vc->write_vector->get(vc->alternate_index);
+          http_info->object_size_set(vc->total_len);
+        }
+        ink_assert(!(((unsigned long) &doc->hdr()[0]) & HDR_PTR_ALIGNMENT_MASK));
+        ink_assert(vc->header_len == vc->write_vector->marshal(doc->hdr(), vc->header_len));
+      } else
+#endif
+        memcpy(doc->hdr(), vc->header_to_write, vc->header_len);
+      // the single fragment flag is not used in the write call.
       // putting it in for completeness.
-      vc->f.single_segment = doc->single_segment();
+      vc->f.single_fragment = doc->single_fragment();
     }
-#endif
     // move data
     if (vc->write_len) {
       {
@@ -862,31 +835,29 @@
       }
 #ifdef HTTP_CACHE
       if (vc->f.rewrite_resident_alt)
-        iobufferblock_memcpy(&doc->hdr[vc->vec_len], vc->write_len, res_alt_blk, 0);
+        iobufferblock_memcpy(doc->data(), vc->write_len, res_alt_blk, 0);
       else
 #endif
-        iobufferblock_memcpy(&doc->hdr[vc->vec_len], vc->write_len, vc->blocks, vc->offset);
+        iobufferblock_memcpy(doc->data(), vc->write_len, vc->blocks, vc->offset);
 #ifdef VERIFY_JTEST_DATA
-      if (f.use_first_key && vec_len) {
+      if (f.use_first_key && header_len) {
         int ib = 0, xd = 0;
         char xx[500];
         new_info.request_get().url_get().print(xx, 500, &ib, &xd);
         char *x = xx;
         for (int q = 0; q < 3; q++)
           x = strchr(x + 1, '/');
-        ink_assert(!memcmp(&doc->hdr[vec_len], x, ib - (x - xx)));
+        ink_assert(!memcmp(doc->hdr(), x, ib - (x - xx)));
       }
 #endif
 
     }
     if (cache_config_enable_checksum) {
       doc->checksum = 0;
-      for (char *b = doc->hdr; b < (char *) doc + doc->len; b++) {
+      for (char *b = doc->hdr(); b < (char *) doc + doc->len; b++)
         doc->checksum += *b;
-      }
     }
-
-    if (vc->f.http_request && vc->f.single_segment)
+    if (vc->frag_type == CACHE_FRAG_TYPE_HTTP && vc->f.single_fragment)
       ink_assert(doc->hlen);
 
     if (res_alt_blk)
@@ -994,86 +965,76 @@
    the eventProcessor to schedule events
    */
 int
-Part::aggWrite(int event, Event * e)
+Part::aggWrite(int event, void *e)
 {
   NOWARN_UNUSED(e);
   NOWARN_UNUSED(event);
 
+  Que(CacheVC, link) tocall;
+  CacheVC *c;
+
   cancel_trigger();
 
   ink_assert(!is_io_in_progress());
 Lagain:
   // calculate length of aggregated write
-  CacheVC * c;
-
   for (c = (CacheVC *) agg.head; c;) {
     int writelen = c->agg_len;
     ink_assert(writelen < AGG_SIZE);
-    if (agg_buf_pos + writelen > AGG_SIZE || header->write_pos + agg_buf_pos + writelen > (skip + len))
+    if (agg_buf_pos + writelen > AGG_SIZE || 
+        header->write_pos + agg_buf_pos + writelen > (skip + len))
       break;
-    Debug("agg_read", "copying: %d, %llu, key: %d", agg_buf_pos, header->write_pos + agg_buf_pos, c->first_key.word(0));
-    agg_copy(agg_buffer + agg_buf_pos, c);
+    Debug("agg_read", "copying: %d, %llu, key: %d", 
+          agg_buf_pos, header->write_pos + agg_buf_pos, c->first_key.word(0));
+    int wrotelen = agg_copy(agg_buffer + agg_buf_pos, c);
+    ink_assert(writelen == wrotelen);
     agg_todo_size -= writelen;
     agg_buf_pos += writelen;
-    CacheVC *n = (CacheVC *) c->link.next;
+    CacheVC *n = (CacheVC *)c->link.next;
     agg.dequeue();
     if (c->f.evacuator)
       c->handleEvent(AIO_EVENT_DONE, 0);
     else
-      callback_cont->write_done.enqueue(c);
+      tocall.enqueue(c);
     c = n;
   }
 
   // if we got nothing...
   if (!agg_buf_pos) {
-    if (!agg.head) {
-      // nothing to get
-      return EVENT_DONE;
-    }
+    if (!agg.head) // nothing to get
+      return EVENT_CONT;
     if (header->write_pos == start) {
       // write aggregation too long, bad bad, punt on everything.
-      //action_tag_assert("cache", false);
       Note("write aggregation exceeds part size");
-      if (is_action_tag_set("cache")) {
-        ink_release_assert(false);
-      }
-      CacheVC *vc;
-      while ((vc = agg.dequeue())) {
-        agg_todo_size -= vc->agg_len;
-        // signal failure?
-        callback_cont->write_done.enqueue(vc);
-        if (!callback_cont->trigger)
-          callback_cont->trigger = eventProcessor.schedule_imm(callback_cont);
+      ink_assert(!tocall.head);
+      ink_assert(false);
+      while ((c = agg.dequeue())) {
+        agg_todo_size -= c->agg_len;
+        c->initial_thread->schedule_imm(c, AIO_EVENT_DONE);
       }
-      return EVENT_DONE;
+      return EVENT_CONT;
     }
     // start back
     agg_wrap();
     goto Lagain;
   }
 
-  if (!callback_cont->trigger) {
-    if (callback_cont->write_done.head)
-      callback_cont->trigger = eventProcessor.schedule_imm(callback_cont);
-  }
+  // set write limit
+  header->agg_pos = header->write_pos + agg_buf_pos;
+
   // evacuate space
-  ink_off_t end = header->write_pos + agg_buf_pos + EVAC_SIZE;
+  ink_off_t end = header->write_pos + agg_buf_pos + EVACUATION_SIZE;
   if (evac_range(header->write_pos, end, !header->phase) < 0)
-    return EVENT_CONT;
+    goto Lwait;
   if (end > skip + len)
     if (evac_range(start, start + (end - (skip + len)), header->phase))
-      return EVENT_CONT;
+      goto Lwait;
 
   // if agg.head, then we are near the end of the disk, so
   // write down the aggregation in whatever size it is.
-  if (agg_buf_pos < MAX_AGG_LEN && !agg.head && !dir_sync_waiting)
-    return EVENT_CONT;
-  Debug("agg_read", "flushing: %d", agg_buf_pos);
-
-  // set write limit
-  header->agg_pos = header->write_pos + agg_buf_pos;
+  if (agg_buf_pos < AGG_HIGH_WATER && !agg.head && !dir_sync_waiting)
+    goto Lwait;
 
-  // do write
   io.aiocb.aio_fildes = fd;
   io.aiocb.aio_offset = header->write_pos;
   io.aiocb.aio_buf = agg_buffer;
@@ -1082,7 +1043,16 @@
   io.thread = mutex->thread_holding;
   SET_HANDLER(&Part::aggWriteDone);
   ink_aio_write(&io);
-  return EVENT_CONT;
+
+Lwait:
+  int ret = EVENT_CONT;
+  while ((c = tocall.dequeue())) {
+    if (event == EVENT_CALL && c->mutex->thread_holding == mutex->thread_holding)
+      ret = EVENT_RETURN;
+    else
+      c->initial_thread->schedule_imm(c, AIO_EVENT_DONE);
+  }
+  return ret;
 }
 
 int
@@ -1100,21 +1070,21 @@
       VC_SCHED_LOCK_RETRY();
     }
     part->close_write(this);
-    if (closed < 0 && segment)
+    if (closed < 0 && fragment)
       dir_delete(&earliest_key, part, &earliest_dir);
   }
   if (is_debug_tag_set("cache_update")) {
     if (f.update && closed > 0) {
       if (!total_len && alternate_index != CACHE_ALT_REMOVED) {
         Debug("cache_update", "header only %d (%llu, %llu)\n",
-              DIR_MASK_TAG(first_key.word(1)), update_key.b[0], update_key.b[1]);
+              DIR_MASK_TAG(first_key.word(2)), update_key.b[0], update_key.b[1]);
 
       } else if (total_len && alternate_index != CACHE_ALT_REMOVED) {
         Debug("cache_update", "header body, %d, (%llu, %llu), (%llu, %llu)\n",
-              DIR_MASK_TAG(first_key.word(1)), update_key.b[0], update_key.b[1], earliest_key.b[0], earliest_key.b[1]);
+              DIR_MASK_TAG(first_key.word(2)), update_key.b[0], update_key.b[1], earliest_key.b[0], earliest_key.b[1]);
       } else if (!total_len && alternate_index == CACHE_ALT_REMOVED) {
         Debug("cache_update", "alt delete, %d, (%llu, %llu)\n",
-              DIR_MASK_TAG(first_key.word(1)), update_key.b[0], update_key.b[1]);
+              DIR_MASK_TAG(first_key.word(2)), update_key.b[0], update_key.b[1]);
       }
     }
   }
@@ -1123,29 +1093,14 @@
   // one, two and three or more fragments. This is because for
   // updates we dont decrement the variable corresponding the old
   // size of the document
-  /*
-     Debug("segment_size","Segment = %d\n",segment);
-     Debug("segment_size","total_len = %d\n",total_len);
-     Debug("segment_size","closed = %d\n",closed);
-     Debug("segment_size","f.update = %d\n",f.update);
-   */
   if ((closed == 1) && (total_len > 0)) {
-    Debug("cache_stats", "Segment = %d", segment);
-    switch (segment) {
-    case 0:
-      CACHE_INCREMENT_DYN_STAT(cache_single_fragment_document_count_stat);
-      break;
-
-    case 1:
-      CACHE_INCREMENT_DYN_STAT(cache_two_fragment_document_count_stat);
-      break;
-
-    default:
-      CACHE_INCREMENT_DYN_STAT(cache_three_plus_plus_fragment_document_count_stat);
-      break;
+    Debug("cache_stats", "Fragment = %d", fragment);
+    switch (fragment) {
+      case 0: CACHE_INCREMENT_DYN_STAT(cache_single_fragment_document_count_stat); break;
+      case 1: CACHE_INCREMENT_DYN_STAT(cache_two_fragment_document_count_stat); break;
+      default: CACHE_INCREMENT_DYN_STAT(cache_three_plus_plus_fragment_document_count_stat); break;
     }
   }
-
   return free_CacheVC(this);
 }
 
@@ -1153,43 +1108,47 @@
 CacheVC::openWriteCloseHeadDone(int event, Event * e)
 {
   NOWARN_UNUSED(e);
-  ink_assert(event == AIO_EVENT_DONE);
-  set_io_not_in_progress();
-  od->writing_vec = 0;
-
-  if (!io.ok())
-    return openWriteCloseDir(event, e);
-  ink_assert(f.use_first_key);
-  // lock taken by caller
-  ink_debug_assert(part->mutex->thread_holding == this_ethread());
-  if (!od->dont_update_directory) {
-    if (dir_is_empty(&od->first_dir)) {
-      dir_insert(&first_key, part, &dir);
-    } else {
-      // multiple segment vector write
-      dir_overwrite(&first_key, part, &dir, &od->first_dir, false);
-      // insert moved resident alternate
-      if (od->move_resident_alt) {
-        if (dir_valid(part, &od->single_doc_dir))
-          dir_insert(&od->single_doc_key, part, &od->single_doc_dir);
-        od->move_resident_alt = 0;
+  {
+    CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
+    if (!lock)
+      VC_LOCK_RETRY_EVENT();
+    if (event == AIO_EVENT_DONE)
+      set_io_not_in_progress();
+    else if (is_io_in_progress())
+      return EVENT_CONT;
+    od->writing_vec = 0;
+    if (!io.ok())
+      goto Lclose;
+    ink_assert(f.use_first_key);
+    if (!od->dont_update_directory) {
+      if (dir_is_empty(&od->first_dir)) {
+        dir_insert(&first_key, part, &dir);
+      } else {
+        // multiple fragment vector write
+        dir_overwrite(&first_key, part, &dir, &od->first_dir, false);
+        // insert moved resident alternate
+        if (od->move_resident_alt) {
+          if (dir_valid(part, &od->single_doc_dir))
+            dir_insert(&od->single_doc_key, part, &od->single_doc_dir);
+          od->move_resident_alt = 0;
+        }
       }
-    }
-    od->first_dir = dir;
-    if (f.http_request && f.single_segment) {
-      // segment is tied to the vector
-      od->move_resident_alt = 1;
-      if (!f.rewrite_resident_alt) {
-        od->single_doc_key = earliest_key;
+      od->first_dir = dir;
+      if (frag_type == CACHE_FRAG_TYPE_HTTP && f.single_fragment) {
+        // fragment is tied to the vector
+        od->move_resident_alt = 1;
+        if (!f.rewrite_resident_alt) {
+          od->single_doc_key = earliest_key;
+        }
+        dir_assign(&od->single_doc_dir, &dir);
+        dir_set_tag(&od->single_doc_dir, od->single_doc_key.word(2));
       }
-      dir_assign(&od->single_doc_dir, &dir);
-      dir_set_tag(&od->single_doc_dir, od->single_doc_key.word(1));
     }
   }
+Lclose:
   return openWriteCloseDir(event, e);
 }
 
-
 int
 CacheVC::openWriteCloseHead(int event, Event * e)
 {
@@ -1198,7 +1157,7 @@
   cancel_trigger();
   f.use_first_key = 1;
   if (io.ok())
-    ink_assert(segment || (length == total_len));
+    ink_assert(fragment || (length == total_len));
   else
     return openWriteCloseDir(event, e);
   if (f.data_done)
@@ -1206,7 +1165,7 @@
   else
     write_len = length;
 #ifdef HTTP_CACHE
-  if (f.http_request) {
+  if (frag_type == CACHE_FRAG_TYPE_HTTP) {
     SET_HANDLER(&CacheVC::updateVector);
     return updateVector(EVENT_IMMEDIATE, 0);
   } else {
@@ -1214,7 +1173,6 @@
     SET_HANDLER(&CacheVC::openWriteCloseHeadDone);
     return do_write_lock();
 #ifdef HTTP_CACHE
-
   }
 #endif
 }
@@ -1224,26 +1182,29 @@
 {
   NOWARN_UNUSED(e);
 
-  if (event == AIO_EVENT_DONE) {
+  if (event == AIO_EVENT_DONE)
     set_io_not_in_progress();
-
-    if (!io.ok())
-      return openWriteCloseDir(event, e);
+  else if (is_io_in_progress())
+    return EVENT_CONT;
+  if (!io.ok())
+    return openWriteCloseDir(event, e);
+  {
+    CACHE_TRY_LOCK(lock, part->mutex, this_ethread());
+    if (!lock)
+      VC_LOCK_RETRY_EVENT();
+    fragment++;
+    write_pos += write_len;
+    dir_insert(&key, part, &dir);
+    f.data_done = 1;
+    ink_assert(fragment);
+    return openWriteCloseHead(event, e); // must be called under part lock from here
   }
-  // locked taken by caller
-  ink_debug_assert(part->mutex->thread_holding == this_ethread());
-  dir_insert(&key, part, &dir);
-
-  f.data_done = 1;
-  ink_assert(segment);
-  return openWriteCloseHead(event, e);
 }
 
 int
 CacheVC::openWriteClose(int event, Event * e)
 {
   NOWARN_UNUSED(e);
-
   cancel_trigger();
   if (is_io_in_progress()) {
     if (event != AIO_EVENT_DONE)
@@ -1265,18 +1226,16 @@
       }
 #else
       return openWriteCloseDir(event, e);
-#endif //HTTP_CACHE
-
+#endif
     }
-    if (length && segment) {
+    if (length && fragment) {
       SET_HANDLER(&CacheVC::openWriteCloseDataDone);
       write_len = length;
-      return do_write_lock();
+      return do_write_lock_call();
     } else
       return openWriteCloseHead(event, e);
-  } else {
+  } else
     return openWriteCloseDir(event, e);
-  }
 }
 
 int
@@ -1285,11 +1244,11 @@
   NOWARN_UNUSED(e);
 
   cancel_trigger();
-  ink_assert(is_io_in_progress());
-  if (event != AIO_EVENT_DONE)
-    return EVENT_CONT;
-
-  set_io_not_in_progress();
+  if (event == AIO_EVENT_DONE)
+    set_io_not_in_progress();
+  else
+    if (is_io_in_progress())
+      return EVENT_CONT;
   // In the event of VC_EVENT_ERROR, the cont must do an io_close
   if (!io.ok()) {
     if (closed) {
@@ -1300,18 +1259,36 @@
     calluser(VC_EVENT_ERROR);
     return EVENT_CONT;
   }
-  // store the earliest directory. Need to remove the earliest dir
-  // in case the writer aborts.
-  if (!segment) {
-    ink_assert(key == earliest_key);
-    earliest_dir = dir;
-  }
-  segment++;
-  dir_insert(&key, part, &dir);
-  Debug("cache_insert", "WriteDone: %X, %X, %d", key.word(0), first_key.word(0), write_len);
-  blocks = iobufferblock_skip(blocks, &offset, &length, write_len);
-  write_len = length;
-  next_CacheKey(&key, &key);
+  {
+    CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
+    if (!lock)
+      VC_LOCK_RETRY_EVENT();
+    // store the earliest directory. Need to remove the earliest dir
+    // in case the writer aborts.
+    if (!fragment) {
+      ink_assert(key == earliest_key);
+      earliest_dir = dir;
+    } else {
+      if (!frag) 
+        frag = &integral_frags[0];
+      else {
+        if (fragment-1 >= INTEGRAL_FRAGS && power_of_2((inku32)(fragment-1))) {
+          Frag *t = frag;
+          frag = (Frag*)xmalloc(sizeof(Frag) * (fragment-1)*2);
+          memcpy(frag, t, sizeof(Frag) * (fragment-2));
+          if (t != integral_frags)
+            xfree(t);
+        }
+      }
+      frag[fragment-1].offset = write_pos;
+    }
+    fragment++;
+    write_pos += write_len;
+    dir_insert(&key, part, &dir);
+    Debug("cache_insert", "WriteDone: %X, %X, %d", key.word(0), first_key.word(0), write_len);
+    blocks = iobufferblock_skip(blocks, &offset, (int*)&length, write_len);
+    next_CacheKey(&key, &key);
+  }
   if (closed)
     return die();
   SET_HANDLER(&CacheVC::openWriteMain);
@@ -1340,10 +1317,10 @@
     if (vio.ntodo() <= 0)
       return EVENT_CONT;
   }
-  ink64 ntodo = (ink64) vio.ntodo() + length;
-  int total_avail = vio.buffer.reader()->read_avail();
-  int avail = total_avail;
-  int towrite = avail + length;
+  ink64 ntodo = (ink64)(vio.ntodo() + length);
+  ink64 total_avail = vio.buffer.reader()->read_avail();
+  ink64 avail = total_avail;
+  ink64 towrite = avail + length;
   if (towrite > ntodo) {
     avail -= (towrite - ntodo);
     towrite = ntodo;
@@ -1361,7 +1338,7 @@
     vio.ndone += avail;
     total_len += avail;
   }
-  length = towrite;
+  length = (inku64)towrite;
   if (length > TARGET_FRAG_SIZE && length < SHRINK_TARGET_FRAG_SIZE)
     write_len = TARGET_FRAG_SIZE;
   else
@@ -1380,7 +1357,7 @@
     return EVENT_CONT;
 
   SET_HANDLER(&CacheVC::openWriteWriteDone);
-  return do_write_lock();
+  return do_write_lock_call();
 }
 
 // begin overwrite
@@ -1388,7 +1365,6 @@
 CacheVC::openWriteOverwrite(int event, Event * e)
 {
   NOWARN_UNUSED(e);
-  int res;
 
   cancel_trigger();
   if (event != AIO_EVENT_DONE) {
@@ -1409,20 +1385,24 @@
     od->first_dir = dir;
     goto Ldone;
   }
-
 Lcollision:
   {
     CACHE_TRY_LOCK(lock, part->mutex, this_ethread());
     if (!lock)
-      VC_SCHED_LOCK_RETRY();
-    res = dir_probe(&first_key, part, &dir, &last_collision);
-    if (res > 0)
-      return do_read(&first_key);
+      VC_LOCK_RETRY_EVENT();
+    int res = dir_probe(&first_key, part, &dir, &last_collision);
+    if (res > 0) {
+      if ((res = do_read_call(&first_key)) == EVENT_RETURN)
+        goto Lcallreturn;
+      return res;
+    }
   }
 Ldone:
   SET_HANDLER(&CacheVC::openWriteMain);
   _action.continuation->handleEvent(CACHE_EVENT_OPEN_WRITE, (void *) this);
   return EVENT_DONE;
+Lcallreturn:
+  return handleEvent(AIO_EVENT_DONE, 0); // hopefully a tail call
 }
 
 #ifdef HTTP_CACHE
@@ -1458,7 +1438,7 @@
     if (!dir_valid(part, &dir)) {
       Debug("cache_write",
             "OpenReadStartDone: Dir not valid: Write Head: %d, Dir: %d",
-            offset_to_part_offset(part, part->header->write_pos), dir.offset);
+            offset_to_part_offset(part, part->header->write_pos), dir_offset(&dir));
       last_collision = NULL;
       goto Lcollision;
     }
@@ -1472,21 +1452,21 @@
       err = ECACHE_BAD_META_DATA;
       goto Lfailure;
     }
-    ink_assert((((unsigned long) &doc->hdr[0]) & HDR_PTR_ALIGNMENT_MASK) == 0);
+    ink_assert((((unsigned long) &doc->hdr()[0]) & HDR_PTR_ALIGNMENT_MASK) == 0);
 
-    if (write_vector->get_handles(doc->hdr, doc->hlen, buf) != doc->hlen) {
+    if (write_vector->get_handles(doc->hdr(), doc->hlen, buf) != doc->hlen) {
       err = ECACHE_BAD_META_DATA;
       goto Lfailure;
     }
     ink_debug_assert(write_vector->count() > 0);
     od->first_dir = dir;
     first_dir = dir;
-    if (doc->single_segment()) {
-      // segment is tied to the vector
+    if (doc->single_fragment()) {
+      // fragment is tied to the vector
       od->move_resident_alt = 1;
       od->single_doc_key = doc->key;
       dir_assign(&od->single_doc_dir, &dir);
-      dir_set_tag(&od->single_doc_dir, DIR_MASK_TAG(od->single_doc_key.word(1)));
+      dir_set_tag(&od->single_doc_dir, od->single_doc_key.word(2));
     }
     first_buf = buf;
     goto Lsuccess;
@@ -1497,12 +1477,13 @@
     int if_writers = ((uintptr_t) info == CACHE_ALLOW_MULTIPLE_WRITES);
     CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
     if (!lock)
-      VC_SCHED_LOCK_RETRY();
+      VC_LOCK_RETRY_EVENT();
     if (!od) {
-      if ((err = part->open_write(this, if_writers,
-                                  cache_config_http_max_alts > 1 ? cache_config_http_max_alts : 0)) > 0)
+      if ((err = part->open_write(
+             this, if_writers, cache_config_http_max_alts > 1 ? cache_config_http_max_alts : 0)) > 0)
         goto Lfailure;
       if (od->has_multiple_writers()) {
+        MUTEX_RELEASE(lock);
         SET_HANDLER(&CacheVC::openWriteMain);
         _action.continuation->handleEvent(CACHE_EVENT_OPEN_WRITE, (void *) this);
         return EVENT_DONE;
@@ -1511,13 +1492,15 @@
     // check for collision
     if (dir_probe(&first_key, part, &dir, &last_collision)) {
       od->reading_vec = 1;
-      return do_read(&first_key);
+      int ret = do_read_call(&first_key);
+      if (ret == EVENT_RETURN)
+        goto Lcallreturn;
+      return ret;
     }
     if (f.update) {
       // fail update because vector has been GC'd
       goto Lfailure;
     }
-
   }
 Lsuccess:
   od->reading_vec = 0;
@@ -1538,6 +1521,8 @@
     return EVENT_DONE;
   } else
     return free_CacheVC(this);
+Lcallreturn:
+  return handleEvent(AIO_EVENT_DONE, 0); // hopefully a tail call
 }
 #endif
 // handle lock failures from main Cache::open_write entry points below
@@ -1556,13 +1541,16 @@
     VC_SCHED_LOCK_RETRY();
   if ((err = part->open_write(this, false, 1)) > 0) {
     CACHE_INCREMENT_DYN_STAT(base_stat + CACHE_STAT_FAILURE);
+    free_CacheVC(this);
+    MUTEX_RELEASE(lock);
     _action.continuation->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *) -err);
-    return free_CacheVC(this);
+    return 0;
   }
   if (f.overwrite) {
     SET_HANDLER(&CacheVC::openWriteOverwrite);
     return openWriteOverwrite(EVENT_IMMEDIATE, 0);
   } else {
+    MUTEX_RELEASE(lock);
     // write by key
     SET_HANDLER(&CacheVC::openWriteMain);
     _action.continuation->handleEvent(CACHE_EVENT_OPEN_WRITE, (void *) this);
@@ -1592,8 +1580,9 @@
   Part *part = c->part;
   CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
   c->first_key = c->key = *key;
+  c->frag_type = frag_type;
   /*
-     The transition from single segment document to a multi-segment document
+     The transition from single fragment document to a multi-fragment document
      would cause a problem if the key and the first_key collide. In case of
      a collision, old vector data could be served to HTTP. Need to avoid that.
      Also, when evacuating a fragment, we have to decide if its the first_key
@@ -1601,8 +1590,7 @@
    */
   do {
     rand_CacheKey(&c->key, cont->mutex);
-  }
-  while (DIR_MASK_TAG(c->key.word(1)) == DIR_MASK_TAG(c->first_key.word(1)));
+  } while (DIR_MASK_TAG(c->key.word(2)) == DIR_MASK_TAG(c->first_key.word(2)));
   c->earliest_key = c->key;
 #ifdef HTTP_CACHE
   c->info = 0;
@@ -1656,7 +1644,7 @@
   c->vio.op = VIO::WRITE;
   c->first_key = *key;
   /*
-     The transition from single segment document to a multi-segment document
+     The transition from single fragment document to a multi-fragment document
      would cause a problem if the key and the first_key collide. In case of
      a collision, old vector data could be served to HTTP. Need to avoid that.
      Also, when evacuating a fragment, we have to decide if its the first_key
@@ -1665,9 +1653,9 @@
   do {
     rand_CacheKey(&c->key, cont->mutex);
   }
-  while (DIR_MASK_TAG(c->key.word(1)) == DIR_MASK_TAG(c->first_key.word(1)));
+  while (DIR_MASK_TAG(c->key.word(2)) == DIR_MASK_TAG(c->first_key.word(2)));
   c->earliest_key = c->key;
-  c->f.http_request = 1;
+  c->frag_type = CACHE_FRAG_TYPE_HTTP;
   c->part = key_to_part(key, hostname, host_len);
   Part *part = c->part;
   c->info = info;
@@ -1712,44 +1700,47 @@
   CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
   c->pin_in_cache = (inku32) apin_in_cache;
 
-  CACHE_TRY_LOCK(lock, c->part->mutex, cont->mutex->thread_holding);
-  if (lock) {
-    if ((err = c->part->open_write(c, if_writers, cache_config_http_max_alts > 1 ? cache_config_http_max_alts : 0)) > 0) {
-      goto Lfailure;
-    }
-    // If there are multiple writers, then this one cannot be an update.
-    // Only the first writer can do an update. If that's the case, we can
-    // return success to the state machine now.;
-    if (c->od->has_multiple_writers()) {
-      SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteMain);
-      cont->handleEvent(CACHE_EVENT_OPEN_WRITE, (void *) c);
-      return ACTION_RESULT_DONE;
-    }
-    if (!dir_probe(key, c->part, &c->dir, &c->last_collision)) {
-      if (c->f.update) {
-        // fail update because vector has been GC'd
-        // This situation can also arise in openWriteStartDone
-        err = ECACHE_NO_DOC;
+  {
+    CACHE_TRY_LOCK(lock, c->part->mutex, cont->mutex->thread_holding);
+    if (lock) {
+      if ((err = c->part->open_write(c, if_writers, 
+                                     cache_config_http_max_alts > 1 ? cache_config_http_max_alts : 0)) > 0)
         goto Lfailure;
+      // If there are multiple writers, then this one cannot be an update.
+      // Only the first writer can do an update. If that's the case, we can
+      // return success to the state machine now.;
+      if (c->od->has_multiple_writers())
+        goto Lmiss;
+      if (!dir_probe(key, c->part, &c->dir, &c->last_collision)) {
+        if (c->f.update) {
+          // fail update because vector has been GC'd
+          // This situation can also arise in openWriteStartDone
+          err = ECACHE_NO_DOC;
+          goto Lfailure;
+        }
+        // document doesn't exist, begin write
+        goto Lmiss;
+      } else {
+        c->od->reading_vec = 1;
+        // document exists, read vector
+        SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteStartDone);
+        switch (c->do_read_call(&c->first_key)) {
+          case EVENT_DONE: return ACTION_RESULT_DONE;
+          case EVENT_RETURN: goto Lcallreturn;
+          default: return &c->_action;
+        }
       }
-      // document doesn't exist, begin write
-      SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteMain);
-      cont->handleEvent(CACHE_EVENT_OPEN_WRITE, (void *) c);
-      return ACTION_RESULT_DONE;
-    } else {
-      c->od->reading_vec = 1;
-      // document exists, read vector
-      SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteStartDone);
-      if (c->do_read(&c->first_key) == EVENT_CONT)
-        return &c->_action;
-      else
-        return ACTION_RESULT_DONE;
     }
+    // missed lock
+    SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteStartDone);
+    CONT_SCHED_LOCK_RETRY(c);
+    return &c->_action;
   }
-  // missed lock
-  SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteStartDone);
-  CONT_SCHED_LOCK_RETRY(c);
-  return &c->_action;
+
+Lmiss:
+  SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteMain);
+  cont->handleEvent(CACHE_EVENT_OPEN_WRITE, (void *) c);
+  return ACTION_RESULT_DONE;
 
 Lfailure:
   CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_FAILURE);
@@ -1761,5 +1752,10 @@
   }
   free_CacheVC(c);
   return ACTION_RESULT_DONE;
+
+Lcallreturn:
+  if (c->handleEvent(AIO_EVENT_DONE, 0) == EVENT_DONE)
+    return ACTION_RESULT_DONE;
+  return &c->_action;
 }
 #endif

Modified: incubator/trafficserver/traffic/branches/dev/iocore/cache/I_Cache.h
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/branches/dev/iocore/cache/I_Cache.h?rev=892310&r1=892309&r2=892310&view=diff
==============================================================================
--- incubator/trafficserver/traffic/branches/dev/iocore/cache/I_Cache.h (original)
+++ incubator/trafficserver/traffic/branches/dev/iocore/cache/I_Cache.h Fri Dec 18 16:47:37 2009
@@ -128,8 +128,6 @@
 
 struct CacheVConnection:public VConnection
 {
-  CacheVConnection();
-
   VIO *do_io_read(Continuation * c, int nbytes, MIOBuffer * buf) = 0;
   VIO *do_io_write(Continuation * c, int nbytes, IOBufferReader * buf, bool owner = false) = 0;
   void do_io_close(int lerrno = -1) = 0;
@@ -141,13 +139,25 @@
     ink_assert(!"CacheVConnection::do_io_shutdown unsupported");
   }
 
+  virtual int get_header(void **ptr, int *len) = 0;
+  virtual int set_header(void *ptr, int len) = 0;
+  // do_io_pread() may only be issued once in response to CACHE_EVENT_OPEN_READ
+  virtual VIO *do_io_pread(Continuation *c, ink64 nbytes, MIOBuffer *buf, ink_off_t off) = 0;
+
 #ifdef HTTP_CACHE
   virtual void set_http_info(CacheHTTPInfo * info) = 0;
   virtual void get_http_info(CacheHTTPInfo ** info) = 0;
 #endif
 
-  virtual bool is_ram_cache_hit() = 0;
   virtual Action *action() = 0;
+  virtual bool is_ram_cache_hit() = 0;
+  virtual bool set_disk_io_priority(int priority) = 0;
+  virtual int get_disk_io_priority() = 0;
+  virtual bool set_pin_in_cache(time_t t) = 0;
+  virtual time_t get_pin_in_cache() = 0;
+  virtual int get_object_size() = 0;
+
+  CacheVConnection();
 };
 
 void ink_cache_init(ModuleVersion version);

Modified: incubator/trafficserver/traffic/branches/dev/iocore/cache/I_CacheDefs.h
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/branches/dev/iocore/cache/I_CacheDefs.h?rev=892310&r1=892309&r2=892310&view=diff
==============================================================================
--- incubator/trafficserver/traffic/branches/dev/iocore/cache/I_CacheDefs.h (original)
+++ incubator/trafficserver/traffic/branches/dev/iocore/cache/I_CacheDefs.h Fri Dec 18 16:47:37 2009
@@ -33,10 +33,10 @@
 #define CACHE_ALT_INDEX_DEFAULT     -1
 #define CACHE_ALT_REMOVED           -2
 
-#define CACHE_DB_MAJOR_VERSION      19
+#define CACHE_DB_MAJOR_VERSION      20
 #define CACHE_DB_MINOR_VERSION      0
 
-#define CACHE_DIR_MAJOR_VERSION     16
+#define CACHE_DIR_MAJOR_VERSION     17
 #define CACHE_DIR_MINOR_VERSION     0
 
 #define CACHE_DB_FDS                128
@@ -126,4 +126,11 @@
 
 #define CacheKey INK_MD5
 #define CACHE_ALLOW_MULTIPLE_WRITES 1
+
+/* uses of the CacheKey
+   word(0) - cache partition segment
+   word(1) - cache partition bucket
+   word(2) - tag (lower bits), hosttable hash (upper bits)
+   word(3) - ram cache hash, lookaside cache
+ */
 #endif // __CACHE_DEFS_H__

Modified: incubator/trafficserver/traffic/branches/dev/iocore/cache/NewCacheVC.h
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/branches/dev/iocore/cache/NewCacheVC.h?rev=892310&r1=892309&r2=892310&view=diff
==============================================================================
--- incubator/trafficserver/traffic/branches/dev/iocore/cache/NewCacheVC.h (original)
+++ incubator/trafficserver/traffic/branches/dev/iocore/cache/NewCacheVC.h Fri Dec 18 16:47:37 2009
@@ -90,6 +90,46 @@
   {
     return &_action;
   }
+  bool set_pin_in_cache(time_t time_pin)
+  {
+    ink_assert(!"implemented");
+    return false;
+  }
+  bool set_disk_io_priority(int priority)
+  {
+    ink_assert(!"implemented");
+    return false;
+  }
+  time_t get_pin_in_cache()
+  {
+    ink_assert(!"implemented");
+    return 0;
+  }
+  int
+  get_disk_io_priority()
+  {
+    ink_assert(!"implemented");
+    return 0;
+  }
+  int get_header(void **ptr, int *len) 
+  {
+    ink_assert(!"implemented");
+    return -1;
+  }
+  int set_header(void *ptr, int len) 
+  {
+    ink_assert(!"implemented");
+    return -1;
+  }
+  int get_object_size()
+  {
+    ink_assert(!"implemented");
+    return -1;
+  }
+  VIO *do_io_pread(Continuation *c, ink64 nbytes, MIOBuffer *buf, ink_off_t off) {
+    ink_assert(!"implemented");
+    return 0;
+  }
 
   bool appendCacheHttpInfo(const void *data, const inku64 size);
   bool completeCacheHttpInfo(const void *data, const inku64 size);