You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by jp...@apache.org on 2010/03/04 19:53:09 UTC

svn commit: r919125 - in /incubator/trafficserver/traffic/trunk/iocore: cache/ cluster/

Author: jplevyak
Date: Thu Mar  4 18:53:09 2010
New Revision: 919125

URL: http://svn.apache.org/viewvc?rev=919125&view=rev
Log:
TS-222: Cache::open_write should take options arg and allow SYNC writes

Modified:
    incubator/trafficserver/traffic/trunk/iocore/cache/Cache.cc
    incubator/trafficserver/traffic/trunk/iocore/cache/CacheRead.cc
    incubator/trafficserver/traffic/trunk/iocore/cache/CacheTest.cc
    incubator/trafficserver/traffic/trunk/iocore/cache/CacheWrite.cc
    incubator/trafficserver/traffic/trunk/iocore/cache/I_Cache.h
    incubator/trafficserver/traffic/trunk/iocore/cache/P_Cache.h
    incubator/trafficserver/traffic/trunk/iocore/cache/P_CacheInternal.h
    incubator/trafficserver/traffic/trunk/iocore/cache/P_CachePart.h
    incubator/trafficserver/traffic/trunk/iocore/cache/P_CacheTest.h
    incubator/trafficserver/traffic/trunk/iocore/cluster/P_ClusterInline.h

Modified: incubator/trafficserver/traffic/trunk/iocore/cache/Cache.cc
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/trunk/iocore/cache/Cache.cc?rev=919125&r1=919124&r2=919125&view=diff
==============================================================================
--- incubator/trafficserver/traffic/trunk/iocore/cache/Cache.cc (original)
+++ incubator/trafficserver/traffic/trunk/iocore/cache/Cache.cc Thu Mar  4 18:53:09 2010
@@ -253,9 +253,11 @@
 CacheVC::do_io_close(int alerrno)
 {
   ink_debug_assert(mutex->thread_holding == this_ethread());
+  int previous_closed = closed;
   closed = (alerrno == -1) ? 1 : -1;    // Stupid default arguments
   DDebug("cache_close", "do_io_close %lX %d %d", (long) this, alerrno, closed);
-  die();
+  if (!previous_closed && !recursive)
+    die();
 }
 
 void
@@ -338,7 +340,6 @@
 void
 CacheVC::set_http_info(CacheHTTPInfo *ainfo)
 {
-
   ink_assert(!total_len);
   if (f.update) {
     ainfo->object_key_set(update_key);
@@ -354,7 +355,6 @@
 
 bool CacheVC::set_pin_in_cache(time_t time_pin)
 {
-
   if (total_len) {
     ink_assert(!"should Pin the document before writing");
     return false;
@@ -1056,7 +1056,6 @@
 int
 Part::handle_dir_clear(int event, void *data)
 {
-
   int dir_len = part_dirlen(this);
   AIOCallback *op;
 
@@ -1186,7 +1185,6 @@
          were written to just before syncing the directory) and make sure
          that all documents have write_serial <= header->write_serial.
        */
-
       int to_check = header->write_pos - header->last_write_pos;
       ink_assert(to_check && to_check < (int) io.aiocb.aio_nbytes);
       int done = 0;
@@ -1198,7 +1196,6 @@
           goto Lclear;
         }
         done += round_to_approx_size(doc->len);
-
         if (doc->sync_serial > last_write_serial)
           last_sync_serial = doc->sync_serial;
       }
@@ -2798,7 +2795,6 @@
     }
   }
 #endif
-
   // cache plugin
   if (cache_global_hooks != NULL && cache_global_hooks->hooks_set > 0) {
     Debug("cache_plugin", "[CacheProcessor::open_write] Cache hooks are set, old_info=%lX", (long) old_info);
@@ -2819,8 +2815,6 @@
       return ACTION_RESULT_DONE;
     }
   }
-
-
   return caches[type]->open_write(cont, url, request, old_info, pin_in_cache, type);
 }
 

Modified: incubator/trafficserver/traffic/trunk/iocore/cache/CacheRead.cc
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/trunk/iocore/cache/CacheRead.cc?rev=919125&r1=919124&r2=919125&view=diff
==============================================================================
--- incubator/trafficserver/traffic/trunk/iocore/cache/CacheRead.cc (original)
+++ incubator/trafficserver/traffic/trunk/iocore/cache/CacheRead.cc Thu Mar  4 18:53:09 2010
@@ -1022,12 +1022,6 @@
     } else
 #endif
     {
-      // non-http docs have the total len set in the first fragment
-      if (doc->hlen) {
-        ink_debug_assert(!"Cache::openReadStartHead non-http request" " for http doc");
-        err = -ECACHE_BAD_READ_REQUEST;
-        goto Ldone;
-      }
       next_CacheKey(&key, &doc->key);
       f.single_fragment = doc->single_fragment();
       doc_pos = doc->prefix_len();

Modified: incubator/trafficserver/traffic/trunk/iocore/cache/CacheTest.cc
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/trunk/iocore/cache/CacheTest.cc?rev=919125&r1=919124&r2=919125&view=diff
==============================================================================
--- incubator/trafficserver/traffic/trunk/iocore/cache/CacheTest.cc (original)
+++ incubator/trafficserver/traffic/trunk/iocore/cache/CacheTest.cc Thu Mar  4 18:53:09 2010
@@ -56,7 +56,7 @@
   expect_event(EVENT_NONE),
   expect_initial_event(EVENT_NONE),
   initial_event(EVENT_NONE),
-  flags(0)
+  content_salt(0)
 {
   SET_HANDLER(&CacheTestSM::event_handler);
 }
@@ -70,6 +70,16 @@
     free_MIOBuffer(buffer);
 }
 
+int CacheTestSM::open_read_callout() {
+  cvio = cache_vc->do_io_read(this, total_size, buffer);
+  return 1;
+}
+
+int CacheTestSM::open_write_callout() {
+  cvio = cache_vc->do_io_write(this, total_size, buffer_reader);
+  return 1;
+}
+
 int CacheTestSM::event_handler(int event, void *data) {
 
   switch (event) {
@@ -100,8 +110,10 @@
       cache_vc = (CacheVConnection*)data;
       buffer = new_empty_MIOBuffer();
       buffer_reader = buffer->alloc_reader();
-      cvio = cache_vc->do_io_read(this, total_size, buffer);
-      return EVENT_DONE;
+      if (open_read_callout() < 0)
+        goto Lclose_error_next;
+      else
+        return EVENT_DONE;
    
     case CACHE_EVENT_OPEN_READ_FAILED:
       goto Lcancel_next;
@@ -129,8 +141,10 @@
       cache_vc = (CacheVConnection*)data;
       buffer = new_empty_MIOBuffer();
       buffer_reader = buffer->alloc_reader();
-      cvio = cache_vc->do_io_write(this, total_size, buffer_reader);
-      return EVENT_DONE;
+      if (open_write_callout() < 0)
+        goto Lclose_error_next;
+      else
+        return EVENT_DONE;
       
     case CACHE_EVENT_OPEN_WRITE_FAILED:
       goto Lcancel_next;
@@ -209,6 +223,7 @@
 void CacheTestSM::fill_buffer() {
   ink64 avail = buffer->write_avail();
   CacheKey k = key;
+  k.b[1] += content_salt;
   ink64 sk = (ink64)sizeof(key);
   while (avail > 0) {
     ink64 l = avail;
@@ -229,6 +244,7 @@
 int CacheTestSM::check_buffer() { 
   ink64 avail = buffer_reader->read_avail();
   CacheKey k = key;
+  k.b[1] += content_salt;
   char b[sizeof(key)];
   ink64 sk = (ink64)sizeof(key);
   ink64 pos = cvio->ndone -  buffer_reader->read_avail();
@@ -267,25 +283,9 @@
 }
 
 CacheTestSM::CacheTestSM(const CacheTestSM &ao) : RegressionSM(ao) {
-  timeout = ao.timeout;
-  cache_action = ao.cache_action; 
-  start_time = ao.start_time; 
-  cache_vc = ao.cache_vc;
-  cvio = ao.cvio;
-  buffer = ao.buffer;
-  buffer_reader = ao.buffer_reader;
-#ifdef HTTP_CACHE
-  params = ao.params;
-  info = ao.info;
-#endif
-  total_size = ao.total_size;
-  memcpy(urlstr, ao.urlstr, 1024);
-  key = ao.key;
-  repeat_count = ao.repeat_count;
-  expect_event = ao.expect_event;
-  expect_initial_event = ao.expect_initial_event;
-  initial_event = ao.initial_event;
-  flags = ao.flags;
+  int o = (int)(((char*)&start_memcpy_on_clone) - ((char*)this));
+  int s = (int)(((char*)&end_memcpy_on_clone) - ((char*)&start_memcpy_on_clone));
+  memcpy(((char*)this)+o, ((char*)&ao)+o, s);
   SET_HANDLER(&CacheTestSM::event_handler);
 }
 
@@ -298,7 +298,9 @@
 
   EThread *thread = this_ethread();
 
-  CACHE_SM(t, write_test, { cacheProcessor.open_write(this, &key); } );
+  CACHE_SM(t, write_test, { cacheProcessor.open_write(
+        this, &key, CACHE_FRAG_TYPE_NONE, 100, 
+        CACHE_WRITE_OPT_SYNC); } );
   write_test.expect_initial_event = CACHE_EVENT_OPEN_WRITE;
   write_test.expect_event = VC_EVENT_WRITE_COMPLETE;
   write_test.total_size = 100;
@@ -330,6 +332,62 @@
   remove_fail_test.expect_event = CACHE_EVENT_REMOVE_FAILED;
   rand_CacheKey(&remove_fail_test.key, thread->mutex);
 
+  CACHE_SM(t, replace_write_test, { 
+      cacheProcessor.open_write(this, &key, CACHE_FRAG_TYPE_NONE, 100, 
+                                CACHE_WRITE_OPT_SYNC);
+    }
+    int open_write_callout() {
+      header.serial = 10;
+      cache_vc->set_header(&header, sizeof(header));
+      cvio = cache_vc->do_io_write(this, total_size, buffer_reader);
+      return 1;
+    });
+  replace_write_test.expect_initial_event = CACHE_EVENT_OPEN_WRITE;
+  replace_write_test.expect_event = VC_EVENT_WRITE_COMPLETE;
+  replace_write_test.total_size = 100;
+  rand_CacheKey(&replace_write_test.key, thread->mutex);
+
+  CACHE_SM(t, replace_test, {
+      cacheProcessor.open_write(this, &key, CACHE_FRAG_TYPE_NONE, 100,
+                                CACHE_WRITE_OPT_OVERWRITE_SYNC);
+    }   
+    int open_write_callout() {
+      CacheTestHeader *h = 0;
+      int hlen = 0;
+      if (cache_vc->get_header((void**)&h, &hlen) < 0)
+        return -1;
+      if (h->serial != 10)
+        return -1;
+      header.serial = 11;
+      cache_vc->set_header(&header, sizeof(header));
+      cvio = cache_vc->do_io_write(this, total_size, buffer_reader);
+      return 1;
+    });
+  replace_test.expect_initial_event = CACHE_EVENT_OPEN_WRITE;
+  replace_test.expect_event = VC_EVENT_WRITE_COMPLETE;
+  replace_test.total_size = 100;
+  replace_test.key = replace_write_test.key;
+  replace_test.content_salt = 1;
+
+  CACHE_SM(t, replace_read_test, {
+      cacheProcessor.open_read(this, &key); 
+    }
+    int open_read_callout() {
+      CacheTestHeader *h = 0;
+      int hlen = 0;
+      if (cache_vc->get_header((void**)&h, &hlen) < 0)
+        return -1;
+      if (h->serial != 11)
+        return -1;
+      cvio = cache_vc->do_io_read(this, total_size, buffer);
+      return 1;
+    });
+  replace_read_test.expect_initial_event = CACHE_EVENT_OPEN_READ;
+  replace_read_test.expect_event = VC_EVENT_READ_COMPLETE;
+  replace_read_test.total_size = 100;
+  replace_read_test.key = replace_test.key;
+  replace_read_test.content_salt = 1;
+
   r_sequential(
     t,
     write_test.clone(),
@@ -339,6 +397,9 @@
     lookup_fail_test.clone(),
     read_fail_test.clone(),
     remove_fail_test.clone(),
+    replace_write_test.clone(),
+    replace_test.clone(),
+    replace_read_test.clone(),
     NULL
     )->run(pstatus);
   return;

Modified: incubator/trafficserver/traffic/trunk/iocore/cache/CacheWrite.cc
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/trunk/iocore/cache/CacheWrite.cc?rev=919125&r1=919124&r2=919125&view=diff
==============================================================================
--- incubator/trafficserver/traffic/trunk/iocore/cache/CacheWrite.cc (original)
+++ incubator/trafficserver/traffic/trunk/iocore/cache/CacheWrite.cc Thu Mar  4 18:53:09 2010
@@ -24,23 +24,10 @@
 
 #include "P_Cache.h"
 
-
-#define HDR_PTR_SIZE            (sizeof(inku64))
-#define HDR_PTR_ALIGNMENT_MASK  ((HDR_PTR_SIZE) - 1L)
-
-extern int cache_config_agg_write_backlog;
-
-static inline int power_of_2(inku32 x) {
-  while (x) {
-    if (x & 1) {
-      if (x >> 1) 
-        return 0;
-      return 1;
-    }
-    x >>= 1; 
-  }
-  return 1;
-}
+#define IS_POWER_2(_x) (!((_x)&((_x)-1)))
+#define UINT_WRAP_LTE(_x, _y) (((_y)-(_x)) < INT_MAX) // exploit overflow
+#define UINT_WRAP_GTE(_x, _y) (((_x)-(_y)) < INT_MAX) // exploit overflow
+#define UINT_WRAP_LT(_x, _y) (((_x)-(_y)) >= INT_MAX) // exploit overflow
 
 // Given a key, finds the index of the alternate which matches
 // used to get the alternate which is actually present in the document
@@ -196,7 +183,7 @@
     ink_assert(od->writing_vec);
     header_len = write_vector->marshal_length();
     ink_assert(header_len > 0);
-  } else
+  } else if (f.use_first_key)
 #endif
     header_len = header_to_write_len;
   if (f.use_first_key && fragment) {
@@ -331,11 +318,10 @@
 
   cancel_trigger();
 
+  // ensure we have the cacheDirSync lock if we intend to call it later
+  // retaking the current mutex recursively is a NOOP
   CACHE_TRY_LOCK(lock, dir_sync_waiting ? cacheDirSync->mutex : mutex, mutex->thread_holding);
   if (!lock) {
-    // INKqa10347
-    // Race condition between cacheDirSync and the part when setting the
-    // dir_sync_waiting flag
     eventProcessor.schedule_in(this, MUTEX_RETRY_DELAY);
     return EVENT_CONT;
   }
@@ -369,11 +355,21 @@
     agg_buf_pos = 0;
   }
   set_io_not_in_progress();
+  // callback ready sync CacheVCs
+  CacheVC *c = 0;
+  while ((c = sync.dequeue())) {
+    if (UINT_WRAP_LTE(c->write_serial + 2, header->write_serial))
+      c->initial_thread->schedule_imm(c, AIO_EVENT_DONE);
+    else {
+      sync.push(c); // put it back on the front
+      break;
+    }
+  }
   if (dir_sync_waiting) {
     dir_sync_waiting = 0;
     cacheDirSync->handleEvent(EVENT_IMMEDIATE, 0);
   }
-  if (agg.head)
+  if (agg.head || sync.head)
     return aggWrite(event, e);
   return EVENT_CONT;
 }
@@ -774,7 +770,7 @@
     doc->total_len = vc->total_len;
     doc->first_key = vc->first_key;
     doc->sync_serial = part->header->sync_serial;
-    doc->write_serial = part->header->write_serial;
+    vc->write_serial = doc->write_serial = part->header->write_serial;
     doc->checksum = DOC_NO_CHECKSUM;
     if (vc->pin_in_cache) {
       dir_set_pinned(&vc->dir, 1);
@@ -963,13 +959,13 @@
   periodic_scan();
 }
 
-/* NOTE:: This state can be called by an AIO thread, so DON'T DON'T
+/* NOTE: This state can be called by an AIO thread, so DON'T DON'T
    DON'T schedule any events on this thread using VC_SCHED_XXX or
    mutex->thread_holding->schedule_xxx_local(). ALWAYS use 
    eventProcessor.schedule_xxx().
    Also, make sure that any functions called by this also use
    the eventProcessor to schedule events
-   */
+*/
 int
 Part::aggWrite(int event, void *e)
 {
@@ -998,7 +994,12 @@
     agg_buf_pos += writelen;
     CacheVC *n = (CacheVC *)c->link.next;
     agg.dequeue();
-    if (c->f.evacuator)
+    if (c->f.sync && c->f.use_first_key) {
+      CacheVC *last = sync.tail;
+      while (last && UINT_WRAP_LT(c->write_serial, last->write_serial))
+        last = (CacheVC*)last->link.prev;
+      sync.insert(c, last);
+    } else if (c->f.evacuator)
       c->handleEvent(AIO_EVENT_DONE, 0);
     else
       tocall.enqueue(c);
@@ -1007,7 +1008,7 @@
 
   // if we got nothing...
   if (!agg_buf_pos) {
-    if (!agg.head) // nothing to get
+    if (!agg.head && !sync.head) // nothing to get
       return EVENT_CONT;
     if (header->write_pos == start) {
       // write aggregation too long, bad bad, punt on everything.
@@ -1021,8 +1022,10 @@
       return EVENT_CONT;
     }
     // start back
-    agg_wrap();
-    goto Lagain;
+    if (agg.head) {
+      agg_wrap();
+      goto Lagain;
+    }
   }
 
   // evacuate space
@@ -1035,9 +1038,22 @@
 
   // if agg.head, then we are near the end of the disk, so
   // write down the aggregation in whatever size it is.
-  if (agg_buf_pos < AGG_HIGH_WATER && !agg.head && !dir_sync_waiting)
+  if (agg_buf_pos < AGG_HIGH_WATER && !agg.head && !sync.head && !dir_sync_waiting)
     goto Lwait;
 
+  // write sync marker
+  if (!agg_buf_pos) {
+    ink_assert(sync.head);
+    int l = round_to_approx_size(sizeof(Doc));
+    agg_buf_pos = l;
+    Doc *d = (Doc*)agg_buffer;
+    memset(d, 0, sizeof(Doc));
+    d->magic = DOC_MAGIC;
+    d->len = l;
+    d->sync_serial = header->sync_serial;
+    d->write_serial = header->write_serial;
+  }
+
   // set write limit
   header->agg_pos = header->write_pos + agg_buf_pos;
 
@@ -1112,6 +1128,12 @@
       default: CACHE_INCREMENT_DYN_STAT(cache_three_plus_plus_fragment_document_count_stat); break;
     }
   }
+  if (f.close_complete) {
+    recursive++;
+    ink_debug_assert(!part || this_ethread() != part->mutex->thread_holding);
+    vio._cont->handleEvent(VC_EVENT_WRITE_COMPLETE, (void *) &vio);
+    recursive--;
+  }
   return free_CacheVC(this);
 }
 
@@ -1119,14 +1141,14 @@
 CacheVC::openWriteCloseHeadDone(int event, Event *e)
 {
   NOWARN_UNUSED(e);
+  if (event == AIO_EVENT_DONE)
+    set_io_not_in_progress();
+  else if (is_io_in_progress())
+    return EVENT_CONT;
   {
     CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
     if (!lock)
       VC_LOCK_RETRY_EVENT();
-    if (event == AIO_EVENT_DONE)
-      set_io_not_in_progress();
-    else if (is_io_in_progress())
-      return EVENT_CONT;
     od->writing_vec = 0;
     if (!io.ok())
       goto Lclose;
@@ -1282,7 +1304,7 @@
       if (!frag) 
         frag = &integral_frags[0];
       else {
-        if (fragment-1 >= INTEGRAL_FRAGS && power_of_2((inku32)(fragment-1))) {
+        if (fragment-1 >= INTEGRAL_FRAGS && IS_POWER_2((inku32)(fragment-1))) {
           Frag *t = frag;
           frag = (Frag*)xmalloc(sizeof(Frag) * (fragment-1)*2);
           memcpy(frag, t, sizeof(Frag) * (fragment-2));
@@ -1324,6 +1346,7 @@
     called_user = 1;
     if (calluser(VC_EVENT_WRITE_COMPLETE) == EVENT_DONE)
       return EVENT_DONE;
+    ink_assert(!"close expected after write COMPLETE");
     if (vio.ntodo() <= 0)
       return EVENT_CONT;
   }
@@ -1365,7 +1388,11 @@
   }
   if (not_writing)
     return EVENT_CONT;
-
+  if (towrite == ntodo && f.close_complete) {
+    closed = 1;
+    SET_HANDLER(&CacheVC::openWriteClose);
+    return openWriteClose(EVENT_NONE, NULL);
+  }
   SET_HANDLER(&CacheVC::openWriteWriteDone);
   return do_write_lock_call();
 }
@@ -1393,6 +1420,7 @@
     if (!(doc->first_key == first_key))
       goto Lcollision;
     od->first_dir = dir;
+    first_buf = buf;
     goto Ldone;
   }
 Lcollision:
@@ -1543,22 +1571,19 @@
   cancel_trigger();
   if (_action.cancelled)
     return free_CacheVC(this);
-  CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
-  if (!lock)
-    VC_SCHED_LOCK_RETRY();
-  if ((err = part->open_write(this, false, 1)) > 0) {
+  if ((err = part->open_write_lock(this, false, 1) > 0)) {
     CACHE_INCREMENT_DYN_STAT(base_stat + CACHE_STAT_FAILURE);
     free_CacheVC(this);
-    MUTEX_RELEASE(lock);
     _action.continuation->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *) -err);
     return 0;
   }
+  if (err < 0)
+    VC_SCHED_LOCK_RETRY();
   if (f.overwrite) {
-    SET_HANDLER(&CacheVC::openWriteOverwrite);
+     SET_HANDLER(&CacheVC::openWriteOverwrite);
     return openWriteOverwrite(EVENT_IMMEDIATE, 0);
   } else {
-    MUTEX_RELEASE(lock);
-    // write by key
+     // write by key
     SET_HANDLER(&CacheVC::openWriteMain);
     return callcont(CACHE_EVENT_OPEN_WRITE);
   }
@@ -1567,7 +1592,7 @@
 // main entry point for writing of of non-http documents
 Action *
 Cache::open_write(Continuation *cont, CacheKey *key, CacheFragType frag_type,
-                  bool overwrite, time_t apin_in_cache, char *hostname, int host_len)
+                  int options, time_t apin_in_cache, char *hostname, int host_len)
 {
 
   if (!(CacheProcessor::cache_ready & frag_type)) {
@@ -1601,7 +1626,9 @@
 #ifdef HTTP_CACHE
   c->info = 0;
 #endif
-  c->f.overwrite = overwrite;
+  c->f.overwrite = (options & CACHE_WRITE_OPT_OVERWRITE) != 0;
+  c->f.close_complete = (options & CACHE_WRITE_OPT_CLOSE_COMPLETE) != 0;
+  c->f.sync = (options & CACHE_WRITE_OPT_SYNC) == CACHE_WRITE_OPT_SYNC;
   c->pin_in_cache = (inku32) apin_in_cache;
 
   if ((res = c->part->open_write_lock(c, false, 1)) > 0) {
@@ -1616,7 +1643,7 @@
     c->trigger = CONT_SCHED_LOCK_RETRY(c);
     return &c->_action;
   }
-  if (!overwrite) {
+  if (!c->f.overwrite) {
     SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteMain);
     c->callcont(CACHE_EVENT_OPEN_WRITE);
     return ACTION_RESULT_DONE;

Modified: incubator/trafficserver/traffic/trunk/iocore/cache/I_Cache.h
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/trunk/iocore/cache/I_Cache.h?rev=919125&r1=919124&r2=919125&view=diff
==============================================================================
--- incubator/trafficserver/traffic/trunk/iocore/cache/I_Cache.h (original)
+++ incubator/trafficserver/traffic/trunk/iocore/cache/I_Cache.h Thu Mar  4 18:53:09 2010
@@ -37,6 +37,11 @@
 						   CACHE_MODULE_MINOR_VERSION,\
 						   PUBLIC_MODULE_HEADER)
 
+#define CACHE_WRITE_OPT_OVERWRITE       0x0001
+#define CACHE_WRITE_OPT_CLOSE_COMPLETE  0x0002
+#define CACHE_WRITE_OPT_SYNC            (CACHE_WRITE_OPT_CLOSE_COMPLETE | 0x0004)
+#define CACHE_WRITE_OPT_OVERWRITE_SYNC  (CACHE_WRITE_OPT_SYNC | CACHE_WRITE_OPT_OVERWRITE)
+
 class CacheLookupHttpConfig;
 class CacheVC;
 #ifdef HTTP_CACHE
@@ -70,13 +75,13 @@
                                 CacheKey *key, 
                                 CacheFragType frag_type = CACHE_FRAG_TYPE_NONE,
                                 int expected_size = CACHE_EXPECTED_SIZE,
-                                bool overwrite = false,
+                                int options = 0,
                                 time_t pin_in_cache = (time_t) 0,
                                 char *hostname = 0, int host_len = 0);
   Action *open_write_buffer(Continuation *cont, MIOBuffer *buf,
                             CacheKey *key, 
                             CacheFragType frag_type = CACHE_FRAG_TYPE_NONE, 
-                            bool overwrite = false,
+                            int options = 0,
                             time_t pin_in_cache = (time_t) 0,
                             char *hostname = 0, int host_len = 0);
   inkcoreapi Action *remove(Continuation *cont, CacheKey *key,

Modified: incubator/trafficserver/traffic/trunk/iocore/cache/P_Cache.h
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/trunk/iocore/cache/P_Cache.h?rev=919125&r1=919124&r2=919125&view=diff
==============================================================================
--- incubator/trafficserver/traffic/trunk/iocore/cache/P_Cache.h (original)
+++ incubator/trafficserver/traffic/trunk/iocore/cache/P_Cache.h Thu Mar  4 18:53:09 2010
@@ -44,6 +44,5 @@
 #include "P_CacheInternal.h"
 #include "P_CacheHosting.h"
 #include "P_CacheHttp.h"
-#include "P_CacheTest.h"
 #include "NewCacheVC.h"
 #endif /* _P_CACHE_H */

Modified: incubator/trafficserver/traffic/trunk/iocore/cache/P_CacheInternal.h
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/trunk/iocore/cache/P_CacheInternal.h?rev=919125&r1=919124&r2=919125&view=diff
==============================================================================
--- incubator/trafficserver/traffic/trunk/iocore/cache/P_CacheInternal.h (original)
+++ incubator/trafficserver/traffic/trunk/iocore/cache/P_CacheInternal.h Thu Mar  4 18:53:09 2010
@@ -214,6 +214,7 @@
 extern int cache_config_read_while_writer;
 extern char cache_system_config_directory[PATH_NAME_MAX + 1];
 extern int cache_clustering_enabled;
+extern int cache_config_agg_write_backlog;
 #ifdef HIT_EVACUATE
 extern int cache_config_hit_evacuate_percent;
 extern int cache_config_hit_evacuate_size_limit;
@@ -243,10 +244,13 @@
   }
   int get_header(void **ptr, int *len) 
   {
-    Doc *doc = (Doc*)first_buf->data();
-    *ptr = doc->hdr();
-    *len = doc->hlen;   
-    return 0;
+    if (first_buf.m_ptr) {
+      Doc *doc = (Doc*)first_buf->data();
+      *ptr = doc->hdr();
+      *len = doc->hlen;   
+      return 0;
+    } else
+      return -1;
   }
   int set_header(void *ptr, int len) 
   {
@@ -269,6 +273,7 @@
   int do_write_call();
   int do_write_lock();
   int do_write_lock_call();
+  int do_sync(inku32 target_write_serial);
 
   int openReadClose(int event, Event *e);
   int openReadReadDone(int event, Event *e);
@@ -398,6 +403,7 @@
   int frag_len;         // for communicating with agg_copy
   inku32 write_len;     // for communicating with agg_copy
   inku32 agg_len;       // for communicating with aggWrite
+  inku32 write_serial;  // serial of the final write for SYNC
   Frag *frag;           // arraylist of fragment offset
   Frag integral_frags[INTEGRAL_FRAGS];
   Part *part;
@@ -435,13 +441,12 @@
     {
       unsigned int use_first_key:1;
       unsigned int overwrite:1; // overwrite first_key Dir if it exists
+      unsigned int close_complete:1; // WRITE_COMPLETE is final
+      unsigned int sync:1; // write to be committed to durable storage before WRITE_COMPLETE
       unsigned int evacuator:1;
       unsigned int single_fragment:1;
       unsigned int evac_vector:1;
       unsigned int lookup:1;
-#ifdef HIT_EVACUATE
-      unsigned int hit_evacuate:1;
-#endif
       unsigned int update:1;
       unsigned int remove:1;
       unsigned int remove_aborted_writers:1;
@@ -451,6 +456,9 @@
       unsigned int not_from_ram_cache:1;        // entire doc was from ram cache
       unsigned int rewrite_resident_alt:1;
       unsigned int readers:1;
+#ifdef HIT_EVACUATE
+      unsigned int hit_evacuate:1;
+#endif
     } f;
   };
   //end region C
@@ -923,7 +931,7 @@
   Action *lookup(Continuation *cont, CacheKey *key, CacheFragType type, char *hostname, int host_len);
   inkcoreapi Action *open_read(Continuation *cont, CacheKey *key, CacheFragType type, char *hostname, int len);
   inkcoreapi Action *open_write(Continuation *cont, CacheKey *key,
-                                CacheFragType frag_type, bool overwrite = false,
+                                CacheFragType frag_type, int options = 0,
                                 time_t pin_in_cache = (time_t) 0, char *hostname = 0, int host_len = 0);
   inkcoreapi Action *remove(Continuation *cont, CacheKey *key,
                             CacheFragType type = CACHE_FRAG_TYPE_HTTP, 
@@ -1109,7 +1117,7 @@
 
 inline inkcoreapi Action *
 CacheProcessor::open_write(Continuation *cont, CacheKey *key, CacheFragType frag_type, 
-                           int expected_size, bool overwrite, time_t pin_in_cache, 
+                           int expected_size, int options, time_t pin_in_cache, 
                            char *hostname, int host_len)
 {
   (void) expected_size;
@@ -1118,27 +1126,24 @@
 
   if (m && (cache_clustering_enabled > 0)) {
     return Cluster_write(cont, expected_size, (MIOBuffer *) 0, m,
-                         key, frag_type, overwrite, pin_in_cache,
+                         key, frag_type, options, pin_in_cache,
                          CACHE_OPEN_WRITE, key, (CacheURL *) 0,
                          (CacheHTTPHdr *) 0, (CacheHTTPInfo *) 0, hostname, host_len);
   }
 #endif
-  return caches[frag_type]->open_write(cont, key, frag_type, overwrite, pin_in_cache, hostname, host_len);
-
+  return caches[frag_type]->open_write(cont, key, frag_type, options, pin_in_cache, hostname, host_len);
 }
 
 inline Action *
-CacheProcessor::open_write_buffer(Continuation *cont, MIOBuffer *buf,
-                                  CacheKey *key, 
-                                  CacheFragType frag_type, 
-                                  bool overwrite, time_t pin_in_cache,
+CacheProcessor::open_write_buffer(Continuation *cont, MIOBuffer *buf, CacheKey *key, 
+                                  CacheFragType frag_type, int options, time_t pin_in_cache, 
                                   char *hostname, int host_len) 
 {
   (void)cont;
   (void)buf;
   (void)key;
   (void)frag_type;
-  (void)overwrite;
+  (void)options;
   (void)hostname;
   (void)host_len;
   ink_assert(!"implemented");

Modified: incubator/trafficserver/traffic/trunk/iocore/cache/P_CachePart.h
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/trunk/iocore/cache/P_CachePart.h?rev=919125&r1=919124&r2=919125&view=diff
==============================================================================
--- incubator/trafficserver/traffic/trunk/iocore/cache/P_CachePart.h (original)
+++ incubator/trafficserver/traffic/trunk/iocore/cache/P_CachePart.h Thu Mar  4 18:53:09 2010
@@ -150,6 +150,7 @@
 
   Queue<CacheVC, Continuation::Link_link> agg;
   Queue<CacheVC, Continuation::Link_link> stat_cache_vcs;
+  Queue<CacheVC, Continuation::Link_link> sync;
   char *agg_buffer;
   int agg_todo_size;
   int agg_buf_pos;
@@ -171,11 +172,11 @@
   inku32 last_sync_serial;
   inku32 last_write_serial;
   bool recover_wrapped;
-  int dir_sync_waiting;
-  int dir_sync_in_progress;
+  bool dir_sync_waiting;
+  bool dir_sync_in_progress;
+  bool writing_end_marker;
   RamCacheEntry first_fragment;
 
-
   void cancel_trigger();
 
   int open_write(CacheVC *cont, int allow_if_writers, int max_writers);
@@ -240,11 +241,11 @@
   EvacuationBlock *force_evacuate_head(Dir *dir, int pinned);
   int within_hit_evacuate_window(Dir *dir);
 
-Part():Continuation(new_ProxyMutex()), path(NULL), fd(-1),
-    dir(0), buckets(0), recover_pos(0), prev_recover_pos(0), scan_pos(0), skip(0), start(0),
-    len(0), data_blocks(0), hit_evacuate_window(0), agg_todo_size(0), agg_buf_pos(0), trigger(0),
-    evacuate_size(0), disk(NULL), last_sync_serial(0), last_write_serial(0), recover_wrapped(false),
-    dir_sync_waiting(0), dir_sync_in_progress(0) {
+  Part():Continuation(new_ProxyMutex()), path(NULL), fd(-1),
+         dir(0), buckets(0), recover_pos(0), prev_recover_pos(0), scan_pos(0), skip(0), start(0),
+         len(0), data_blocks(0), hit_evacuate_window(0), agg_todo_size(0), agg_buf_pos(0), trigger(0),
+         evacuate_size(0), disk(NULL), last_sync_serial(0), last_write_serial(0), recover_wrapped(false),
+         dir_sync_waiting(0), dir_sync_in_progress(0), writing_end_marker(0) {
     open_dir.mutex = mutex;
 #if defined(_WIN32)
     agg_buffer = (char *) malloc(AGG_SIZE);

Modified: incubator/trafficserver/traffic/trunk/iocore/cache/P_CacheTest.h
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/trunk/iocore/cache/P_CacheTest.h?rev=919125&r1=919124&r2=919125&view=diff
==============================================================================
--- incubator/trafficserver/traffic/trunk/iocore/cache/P_CacheTest.h (original)
+++ incubator/trafficserver/traffic/trunk/iocore/cache/P_CacheTest.h Thu Mar  4 18:53:09 2010
@@ -63,9 +63,12 @@
                   xprev_host_prob(0), xnext_host_prob(0) {}
 };
 
-struct CacheTestSM;
+struct CacheTestHeader {
+  inku64 serial;
+};
 
 struct CacheTestSM : RegressionSM {
+  int start_memcpy_on_clone; // place all variables to be copied between these markers
   Action *timeout;
   Action *cache_action;
   ink_hrtime start_time;
@@ -84,18 +87,9 @@
   int expect_event;
   int expect_initial_event;
   int initial_event;
-  union
-  {
-    unsigned int flags;
-    struct
-    {
-      unsigned int http_request:1;
-      unsigned int writing:1;
-      unsigned int update:1;
-      unsigned int hit:1;
-      unsigned int remove:1;
-    } f;
-  };
+  inku64 content_salt;
+  CacheTestHeader header;
+  int end_memcpy_on_clone; // place all variables to be copied between these markers
 
   void fill_buffer();
   int check_buffer();
@@ -107,6 +101,8 @@
     make_request_internal();
   }
   virtual void make_request_internal() = 0;
+  virtual int open_read_callout();
+  virtual int open_write_callout();
 
   void cancel_timeout() {
     if (timeout) timeout->cancel();

Modified: incubator/trafficserver/traffic/trunk/iocore/cluster/P_ClusterInline.h
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/trunk/iocore/cluster/P_ClusterInline.h?rev=919125&r1=919124&r2=919125&view=diff
==============================================================================
--- incubator/trafficserver/traffic/trunk/iocore/cluster/P_ClusterInline.h (original)
+++ incubator/trafficserver/traffic/trunk/iocore/cluster/P_ClusterInline.h Thu Mar  4 18:53:09 2010
@@ -169,7 +169,7 @@
 inline Action *
 Cluster_write(Continuation * cont, int expected_size,
               MIOBuffer * buf, ClusterMachine * m,
-              INK_MD5 * url_md5, CacheFragType ft, bool overwrite,
+              INK_MD5 * url_md5, CacheFragType ft, int options,
               time_t pin_in_cache, int opcode,
               CacheKey * key, CacheURL * url,
               CacheHTTPHdr * request, CacheHTTPInfo * old_info, char *hostname, int host_len)
@@ -258,7 +258,7 @@
     writeArgs.url_md5 = url_md5;
     writeArgs.pin_in_cache = pin_in_cache;
     writeArgs.frag_type = ft;
-    writeArgs.cfl_flags |= (overwrite ? CFL_OVERWRITE_ON_WRITE : 0);
+    writeArgs.cfl_flags |= (options & CACHE_WRITE_OPT_OVERWRITE ? CFL_OVERWRITE_ON_WRITE : 0);
     writeArgs.cfl_flags |= (old_info ? CFL_LOPENWRITE_HAVE_OLDINFO : 0);
     writeArgs.cfl_flags |= (allow_multiple_writes ? CFL_ALLOW_MULTIPLE_WRITES : 0);