You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by jp...@apache.org on 2010/03/04 19:53:09 UTC
svn commit: r919125 - in /incubator/trafficserver/traffic/trunk/iocore:
cache/ cluster/
Author: jplevyak
Date: Thu Mar 4 18:53:09 2010
New Revision: 919125
URL: http://svn.apache.org/viewvc?rev=919125&view=rev
Log:
TS-222: Cache::open_write should take options arg and allow SYNC writes
Modified:
incubator/trafficserver/traffic/trunk/iocore/cache/Cache.cc
incubator/trafficserver/traffic/trunk/iocore/cache/CacheRead.cc
incubator/trafficserver/traffic/trunk/iocore/cache/CacheTest.cc
incubator/trafficserver/traffic/trunk/iocore/cache/CacheWrite.cc
incubator/trafficserver/traffic/trunk/iocore/cache/I_Cache.h
incubator/trafficserver/traffic/trunk/iocore/cache/P_Cache.h
incubator/trafficserver/traffic/trunk/iocore/cache/P_CacheInternal.h
incubator/trafficserver/traffic/trunk/iocore/cache/P_CachePart.h
incubator/trafficserver/traffic/trunk/iocore/cache/P_CacheTest.h
incubator/trafficserver/traffic/trunk/iocore/cluster/P_ClusterInline.h
Modified: incubator/trafficserver/traffic/trunk/iocore/cache/Cache.cc
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/trunk/iocore/cache/Cache.cc?rev=919125&r1=919124&r2=919125&view=diff
==============================================================================
--- incubator/trafficserver/traffic/trunk/iocore/cache/Cache.cc (original)
+++ incubator/trafficserver/traffic/trunk/iocore/cache/Cache.cc Thu Mar 4 18:53:09 2010
@@ -253,9 +253,11 @@
CacheVC::do_io_close(int alerrno)
{
ink_debug_assert(mutex->thread_holding == this_ethread());
+ int previous_closed = closed;
closed = (alerrno == -1) ? 1 : -1; // Stupid default arguments
DDebug("cache_close", "do_io_close %lX %d %d", (long) this, alerrno, closed);
- die();
+ if (!previous_closed && !recursive)
+ die();
}
void
@@ -338,7 +340,6 @@
void
CacheVC::set_http_info(CacheHTTPInfo *ainfo)
{
-
ink_assert(!total_len);
if (f.update) {
ainfo->object_key_set(update_key);
@@ -354,7 +355,6 @@
bool CacheVC::set_pin_in_cache(time_t time_pin)
{
-
if (total_len) {
ink_assert(!"should Pin the document before writing");
return false;
@@ -1056,7 +1056,6 @@
int
Part::handle_dir_clear(int event, void *data)
{
-
int dir_len = part_dirlen(this);
AIOCallback *op;
@@ -1186,7 +1185,6 @@
were written to just before syncing the directory) and make sure
that all documents have write_serial <= header->write_serial.
*/
-
int to_check = header->write_pos - header->last_write_pos;
ink_assert(to_check && to_check < (int) io.aiocb.aio_nbytes);
int done = 0;
@@ -1198,7 +1196,6 @@
goto Lclear;
}
done += round_to_approx_size(doc->len);
-
if (doc->sync_serial > last_write_serial)
last_sync_serial = doc->sync_serial;
}
@@ -2798,7 +2795,6 @@
}
}
#endif
-
// cache plugin
if (cache_global_hooks != NULL && cache_global_hooks->hooks_set > 0) {
Debug("cache_plugin", "[CacheProcessor::open_write] Cache hooks are set, old_info=%lX", (long) old_info);
@@ -2819,8 +2815,6 @@
return ACTION_RESULT_DONE;
}
}
-
-
return caches[type]->open_write(cont, url, request, old_info, pin_in_cache, type);
}
Modified: incubator/trafficserver/traffic/trunk/iocore/cache/CacheRead.cc
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/trunk/iocore/cache/CacheRead.cc?rev=919125&r1=919124&r2=919125&view=diff
==============================================================================
--- incubator/trafficserver/traffic/trunk/iocore/cache/CacheRead.cc (original)
+++ incubator/trafficserver/traffic/trunk/iocore/cache/CacheRead.cc Thu Mar 4 18:53:09 2010
@@ -1022,12 +1022,6 @@
} else
#endif
{
- // non-http docs have the total len set in the first fragment
- if (doc->hlen) {
- ink_debug_assert(!"Cache::openReadStartHead non-http request" " for http doc");
- err = -ECACHE_BAD_READ_REQUEST;
- goto Ldone;
- }
next_CacheKey(&key, &doc->key);
f.single_fragment = doc->single_fragment();
doc_pos = doc->prefix_len();
Modified: incubator/trafficserver/traffic/trunk/iocore/cache/CacheTest.cc
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/trunk/iocore/cache/CacheTest.cc?rev=919125&r1=919124&r2=919125&view=diff
==============================================================================
--- incubator/trafficserver/traffic/trunk/iocore/cache/CacheTest.cc (original)
+++ incubator/trafficserver/traffic/trunk/iocore/cache/CacheTest.cc Thu Mar 4 18:53:09 2010
@@ -56,7 +56,7 @@
expect_event(EVENT_NONE),
expect_initial_event(EVENT_NONE),
initial_event(EVENT_NONE),
- flags(0)
+ content_salt(0)
{
SET_HANDLER(&CacheTestSM::event_handler);
}
@@ -70,6 +70,16 @@
free_MIOBuffer(buffer);
}
+int CacheTestSM::open_read_callout() {
+ cvio = cache_vc->do_io_read(this, total_size, buffer);
+ return 1;
+}
+
+int CacheTestSM::open_write_callout() {
+ cvio = cache_vc->do_io_write(this, total_size, buffer_reader);
+ return 1;
+}
+
int CacheTestSM::event_handler(int event, void *data) {
switch (event) {
@@ -100,8 +110,10 @@
cache_vc = (CacheVConnection*)data;
buffer = new_empty_MIOBuffer();
buffer_reader = buffer->alloc_reader();
- cvio = cache_vc->do_io_read(this, total_size, buffer);
- return EVENT_DONE;
+ if (open_read_callout() < 0)
+ goto Lclose_error_next;
+ else
+ return EVENT_DONE;
case CACHE_EVENT_OPEN_READ_FAILED:
goto Lcancel_next;
@@ -129,8 +141,10 @@
cache_vc = (CacheVConnection*)data;
buffer = new_empty_MIOBuffer();
buffer_reader = buffer->alloc_reader();
- cvio = cache_vc->do_io_write(this, total_size, buffer_reader);
- return EVENT_DONE;
+ if (open_write_callout() < 0)
+ goto Lclose_error_next;
+ else
+ return EVENT_DONE;
case CACHE_EVENT_OPEN_WRITE_FAILED:
goto Lcancel_next;
@@ -209,6 +223,7 @@
void CacheTestSM::fill_buffer() {
ink64 avail = buffer->write_avail();
CacheKey k = key;
+ k.b[1] += content_salt;
ink64 sk = (ink64)sizeof(key);
while (avail > 0) {
ink64 l = avail;
@@ -229,6 +244,7 @@
int CacheTestSM::check_buffer() {
ink64 avail = buffer_reader->read_avail();
CacheKey k = key;
+ k.b[1] += content_salt;
char b[sizeof(key)];
ink64 sk = (ink64)sizeof(key);
ink64 pos = cvio->ndone - buffer_reader->read_avail();
@@ -267,25 +283,9 @@
}
CacheTestSM::CacheTestSM(const CacheTestSM &ao) : RegressionSM(ao) {
- timeout = ao.timeout;
- cache_action = ao.cache_action;
- start_time = ao.start_time;
- cache_vc = ao.cache_vc;
- cvio = ao.cvio;
- buffer = ao.buffer;
- buffer_reader = ao.buffer_reader;
-#ifdef HTTP_CACHE
- params = ao.params;
- info = ao.info;
-#endif
- total_size = ao.total_size;
- memcpy(urlstr, ao.urlstr, 1024);
- key = ao.key;
- repeat_count = ao.repeat_count;
- expect_event = ao.expect_event;
- expect_initial_event = ao.expect_initial_event;
- initial_event = ao.initial_event;
- flags = ao.flags;
+ int o = (int)(((char*)&start_memcpy_on_clone) - ((char*)this));
+ int s = (int)(((char*)&end_memcpy_on_clone) - ((char*)&start_memcpy_on_clone));
+ memcpy(((char*)this)+o, ((char*)&ao)+o, s);
SET_HANDLER(&CacheTestSM::event_handler);
}
@@ -298,7 +298,9 @@
EThread *thread = this_ethread();
- CACHE_SM(t, write_test, { cacheProcessor.open_write(this, &key); } );
+ CACHE_SM(t, write_test, { cacheProcessor.open_write(
+ this, &key, CACHE_FRAG_TYPE_NONE, 100,
+ CACHE_WRITE_OPT_SYNC); } );
write_test.expect_initial_event = CACHE_EVENT_OPEN_WRITE;
write_test.expect_event = VC_EVENT_WRITE_COMPLETE;
write_test.total_size = 100;
@@ -330,6 +332,62 @@
remove_fail_test.expect_event = CACHE_EVENT_REMOVE_FAILED;
rand_CacheKey(&remove_fail_test.key, thread->mutex);
+ CACHE_SM(t, replace_write_test, {
+ cacheProcessor.open_write(this, &key, CACHE_FRAG_TYPE_NONE, 100,
+ CACHE_WRITE_OPT_SYNC);
+ }
+ int open_write_callout() {
+ header.serial = 10;
+ cache_vc->set_header(&header, sizeof(header));
+ cvio = cache_vc->do_io_write(this, total_size, buffer_reader);
+ return 1;
+ });
+ replace_write_test.expect_initial_event = CACHE_EVENT_OPEN_WRITE;
+ replace_write_test.expect_event = VC_EVENT_WRITE_COMPLETE;
+ replace_write_test.total_size = 100;
+ rand_CacheKey(&replace_write_test.key, thread->mutex);
+
+ CACHE_SM(t, replace_test, {
+ cacheProcessor.open_write(this, &key, CACHE_FRAG_TYPE_NONE, 100,
+ CACHE_WRITE_OPT_OVERWRITE_SYNC);
+ }
+ int open_write_callout() {
+ CacheTestHeader *h = 0;
+ int hlen = 0;
+ if (cache_vc->get_header((void**)&h, &hlen) < 0)
+ return -1;
+ if (h->serial != 10)
+ return -1;
+ header.serial = 11;
+ cache_vc->set_header(&header, sizeof(header));
+ cvio = cache_vc->do_io_write(this, total_size, buffer_reader);
+ return 1;
+ });
+ replace_test.expect_initial_event = CACHE_EVENT_OPEN_WRITE;
+ replace_test.expect_event = VC_EVENT_WRITE_COMPLETE;
+ replace_test.total_size = 100;
+ replace_test.key = replace_write_test.key;
+ replace_test.content_salt = 1;
+
+ CACHE_SM(t, replace_read_test, {
+ cacheProcessor.open_read(this, &key);
+ }
+ int open_read_callout() {
+ CacheTestHeader *h = 0;
+ int hlen = 0;
+ if (cache_vc->get_header((void**)&h, &hlen) < 0)
+ return -1;
+ if (h->serial != 11)
+ return -1;
+ cvio = cache_vc->do_io_read(this, total_size, buffer);
+ return 1;
+ });
+ replace_read_test.expect_initial_event = CACHE_EVENT_OPEN_READ;
+ replace_read_test.expect_event = VC_EVENT_READ_COMPLETE;
+ replace_read_test.total_size = 100;
+ replace_read_test.key = replace_test.key;
+ replace_read_test.content_salt = 1;
+
r_sequential(
t,
write_test.clone(),
@@ -339,6 +397,9 @@
lookup_fail_test.clone(),
read_fail_test.clone(),
remove_fail_test.clone(),
+ replace_write_test.clone(),
+ replace_test.clone(),
+ replace_read_test.clone(),
NULL
)->run(pstatus);
return;
Modified: incubator/trafficserver/traffic/trunk/iocore/cache/CacheWrite.cc
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/trunk/iocore/cache/CacheWrite.cc?rev=919125&r1=919124&r2=919125&view=diff
==============================================================================
--- incubator/trafficserver/traffic/trunk/iocore/cache/CacheWrite.cc (original)
+++ incubator/trafficserver/traffic/trunk/iocore/cache/CacheWrite.cc Thu Mar 4 18:53:09 2010
@@ -24,23 +24,10 @@
#include "P_Cache.h"
-
-#define HDR_PTR_SIZE (sizeof(inku64))
-#define HDR_PTR_ALIGNMENT_MASK ((HDR_PTR_SIZE) - 1L)
-
-extern int cache_config_agg_write_backlog;
-
-static inline int power_of_2(inku32 x) {
- while (x) {
- if (x & 1) {
- if (x >> 1)
- return 0;
- return 1;
- }
- x >>= 1;
- }
- return 1;
-}
+#define IS_POWER_2(_x) (!((_x)&((_x)-1)))
+#define UINT_WRAP_LTE(_x, _y) (((_y)-(_x)) < INT_MAX) // exploit overflow
+#define UINT_WRAP_GTE(_x, _y) (((_x)-(_y)) < INT_MAX) // exploit overflow
+#define UINT_WRAP_LT(_x, _y) (((_x)-(_y)) >= INT_MAX) // exploit overflow
// Given a key, finds the index of the alternate which matches
// used to get the alternate which is actually present in the document
@@ -196,7 +183,7 @@
ink_assert(od->writing_vec);
header_len = write_vector->marshal_length();
ink_assert(header_len > 0);
- } else
+ } else if (f.use_first_key)
#endif
header_len = header_to_write_len;
if (f.use_first_key && fragment) {
@@ -331,11 +318,10 @@
cancel_trigger();
+ // ensure we have the cacheDirSync lock if we intend to call it later
+ // retaking the current mutex recursively is a NOOP
CACHE_TRY_LOCK(lock, dir_sync_waiting ? cacheDirSync->mutex : mutex, mutex->thread_holding);
if (!lock) {
- // INKqa10347
- // Race condition between cacheDirSync and the part when setting the
- // dir_sync_waiting flag
eventProcessor.schedule_in(this, MUTEX_RETRY_DELAY);
return EVENT_CONT;
}
@@ -369,11 +355,21 @@
agg_buf_pos = 0;
}
set_io_not_in_progress();
+ // callback ready sync CacheVCs
+ CacheVC *c = 0;
+ while ((c = sync.dequeue())) {
+ if (UINT_WRAP_LTE(c->write_serial + 2, header->write_serial))
+ c->initial_thread->schedule_imm(c, AIO_EVENT_DONE);
+ else {
+ sync.push(c); // put it back on the front
+ break;
+ }
+ }
if (dir_sync_waiting) {
dir_sync_waiting = 0;
cacheDirSync->handleEvent(EVENT_IMMEDIATE, 0);
}
- if (agg.head)
+ if (agg.head || sync.head)
return aggWrite(event, e);
return EVENT_CONT;
}
@@ -774,7 +770,7 @@
doc->total_len = vc->total_len;
doc->first_key = vc->first_key;
doc->sync_serial = part->header->sync_serial;
- doc->write_serial = part->header->write_serial;
+ vc->write_serial = doc->write_serial = part->header->write_serial;
doc->checksum = DOC_NO_CHECKSUM;
if (vc->pin_in_cache) {
dir_set_pinned(&vc->dir, 1);
@@ -963,13 +959,13 @@
periodic_scan();
}
-/* NOTE:: This state can be called by an AIO thread, so DON'T DON'T
+/* NOTE: This state can be called by an AIO thread, so DON'T DON'T
DON'T schedule any events on this thread using VC_SCHED_XXX or
mutex->thread_holding->schedule_xxx_local(). ALWAYS use
eventProcessor.schedule_xxx().
Also, make sure that any functions called by this also use
the eventProcessor to schedule events
- */
+*/
int
Part::aggWrite(int event, void *e)
{
@@ -998,7 +994,12 @@
agg_buf_pos += writelen;
CacheVC *n = (CacheVC *)c->link.next;
agg.dequeue();
- if (c->f.evacuator)
+ if (c->f.sync && c->f.use_first_key) {
+ CacheVC *last = sync.tail;
+ while (last && UINT_WRAP_LT(c->write_serial, last->write_serial))
+ last = (CacheVC*)last->link.prev;
+ sync.insert(c, last);
+ } else if (c->f.evacuator)
c->handleEvent(AIO_EVENT_DONE, 0);
else
tocall.enqueue(c);
@@ -1007,7 +1008,7 @@
// if we got nothing...
if (!agg_buf_pos) {
- if (!agg.head) // nothing to get
+ if (!agg.head && !sync.head) // nothing to get
return EVENT_CONT;
if (header->write_pos == start) {
// write aggregation too long, bad bad, punt on everything.
@@ -1021,8 +1022,10 @@
return EVENT_CONT;
}
// start back
- agg_wrap();
- goto Lagain;
+ if (agg.head) {
+ agg_wrap();
+ goto Lagain;
+ }
}
// evacuate space
@@ -1035,9 +1038,22 @@
// if agg.head, then we are near the end of the disk, so
// write down the aggregation in whatever size it is.
- if (agg_buf_pos < AGG_HIGH_WATER && !agg.head && !dir_sync_waiting)
+ if (agg_buf_pos < AGG_HIGH_WATER && !agg.head && !sync.head && !dir_sync_waiting)
goto Lwait;
+ // write sync marker
+ if (!agg_buf_pos) {
+ ink_assert(sync.head);
+ int l = round_to_approx_size(sizeof(Doc));
+ agg_buf_pos = l;
+ Doc *d = (Doc*)agg_buffer;
+ memset(d, 0, sizeof(Doc));
+ d->magic = DOC_MAGIC;
+ d->len = l;
+ d->sync_serial = header->sync_serial;
+ d->write_serial = header->write_serial;
+ }
+
// set write limit
header->agg_pos = header->write_pos + agg_buf_pos;
@@ -1112,6 +1128,12 @@
default: CACHE_INCREMENT_DYN_STAT(cache_three_plus_plus_fragment_document_count_stat); break;
}
}
+ if (f.close_complete) {
+ recursive++;
+ ink_debug_assert(!part || this_ethread() != part->mutex->thread_holding);
+ vio._cont->handleEvent(VC_EVENT_WRITE_COMPLETE, (void *) &vio);
+ recursive--;
+ }
return free_CacheVC(this);
}
@@ -1119,14 +1141,14 @@
CacheVC::openWriteCloseHeadDone(int event, Event *e)
{
NOWARN_UNUSED(e);
+ if (event == AIO_EVENT_DONE)
+ set_io_not_in_progress();
+ else if (is_io_in_progress())
+ return EVENT_CONT;
{
CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
if (!lock)
VC_LOCK_RETRY_EVENT();
- if (event == AIO_EVENT_DONE)
- set_io_not_in_progress();
- else if (is_io_in_progress())
- return EVENT_CONT;
od->writing_vec = 0;
if (!io.ok())
goto Lclose;
@@ -1282,7 +1304,7 @@
if (!frag)
frag = &integral_frags[0];
else {
- if (fragment-1 >= INTEGRAL_FRAGS && power_of_2((inku32)(fragment-1))) {
+ if (fragment-1 >= INTEGRAL_FRAGS && IS_POWER_2((inku32)(fragment-1))) {
Frag *t = frag;
frag = (Frag*)xmalloc(sizeof(Frag) * (fragment-1)*2);
memcpy(frag, t, sizeof(Frag) * (fragment-2));
@@ -1324,6 +1346,7 @@
called_user = 1;
if (calluser(VC_EVENT_WRITE_COMPLETE) == EVENT_DONE)
return EVENT_DONE;
+ ink_assert(!"close expected after write COMPLETE");
if (vio.ntodo() <= 0)
return EVENT_CONT;
}
@@ -1365,7 +1388,11 @@
}
if (not_writing)
return EVENT_CONT;
-
+ if (towrite == ntodo && f.close_complete) {
+ closed = 1;
+ SET_HANDLER(&CacheVC::openWriteClose);
+ return openWriteClose(EVENT_NONE, NULL);
+ }
SET_HANDLER(&CacheVC::openWriteWriteDone);
return do_write_lock_call();
}
@@ -1393,6 +1420,7 @@
if (!(doc->first_key == first_key))
goto Lcollision;
od->first_dir = dir;
+ first_buf = buf;
goto Ldone;
}
Lcollision:
@@ -1543,22 +1571,19 @@
cancel_trigger();
if (_action.cancelled)
return free_CacheVC(this);
- CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
- if (!lock)
- VC_SCHED_LOCK_RETRY();
- if ((err = part->open_write(this, false, 1)) > 0) {
+ if ((err = part->open_write_lock(this, false, 1) > 0)) {
CACHE_INCREMENT_DYN_STAT(base_stat + CACHE_STAT_FAILURE);
free_CacheVC(this);
- MUTEX_RELEASE(lock);
_action.continuation->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *) -err);
return 0;
}
+ if (err < 0)
+ VC_SCHED_LOCK_RETRY();
if (f.overwrite) {
- SET_HANDLER(&CacheVC::openWriteOverwrite);
+ SET_HANDLER(&CacheVC::openWriteOverwrite);
return openWriteOverwrite(EVENT_IMMEDIATE, 0);
} else {
- MUTEX_RELEASE(lock);
- // write by key
+ // write by key
SET_HANDLER(&CacheVC::openWriteMain);
return callcont(CACHE_EVENT_OPEN_WRITE);
}
@@ -1567,7 +1592,7 @@
// main entry point for writing of of non-http documents
Action *
Cache::open_write(Continuation *cont, CacheKey *key, CacheFragType frag_type,
- bool overwrite, time_t apin_in_cache, char *hostname, int host_len)
+ int options, time_t apin_in_cache, char *hostname, int host_len)
{
if (!(CacheProcessor::cache_ready & frag_type)) {
@@ -1601,7 +1626,9 @@
#ifdef HTTP_CACHE
c->info = 0;
#endif
- c->f.overwrite = overwrite;
+ c->f.overwrite = (options & CACHE_WRITE_OPT_OVERWRITE) != 0;
+ c->f.close_complete = (options & CACHE_WRITE_OPT_CLOSE_COMPLETE) != 0;
+ c->f.sync = (options & CACHE_WRITE_OPT_SYNC) == CACHE_WRITE_OPT_SYNC;
c->pin_in_cache = (inku32) apin_in_cache;
if ((res = c->part->open_write_lock(c, false, 1)) > 0) {
@@ -1616,7 +1643,7 @@
c->trigger = CONT_SCHED_LOCK_RETRY(c);
return &c->_action;
}
- if (!overwrite) {
+ if (!c->f.overwrite) {
SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteMain);
c->callcont(CACHE_EVENT_OPEN_WRITE);
return ACTION_RESULT_DONE;
Modified: incubator/trafficserver/traffic/trunk/iocore/cache/I_Cache.h
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/trunk/iocore/cache/I_Cache.h?rev=919125&r1=919124&r2=919125&view=diff
==============================================================================
--- incubator/trafficserver/traffic/trunk/iocore/cache/I_Cache.h (original)
+++ incubator/trafficserver/traffic/trunk/iocore/cache/I_Cache.h Thu Mar 4 18:53:09 2010
@@ -37,6 +37,11 @@
CACHE_MODULE_MINOR_VERSION,\
PUBLIC_MODULE_HEADER)
+#define CACHE_WRITE_OPT_OVERWRITE 0x0001
+#define CACHE_WRITE_OPT_CLOSE_COMPLETE 0x0002
+#define CACHE_WRITE_OPT_SYNC (CACHE_WRITE_OPT_CLOSE_COMPLETE | 0x0004)
+#define CACHE_WRITE_OPT_OVERWRITE_SYNC (CACHE_WRITE_OPT_SYNC | CACHE_WRITE_OPT_OVERWRITE)
+
class CacheLookupHttpConfig;
class CacheVC;
#ifdef HTTP_CACHE
@@ -70,13 +75,13 @@
CacheKey *key,
CacheFragType frag_type = CACHE_FRAG_TYPE_NONE,
int expected_size = CACHE_EXPECTED_SIZE,
- bool overwrite = false,
+ int options = 0,
time_t pin_in_cache = (time_t) 0,
char *hostname = 0, int host_len = 0);
Action *open_write_buffer(Continuation *cont, MIOBuffer *buf,
CacheKey *key,
CacheFragType frag_type = CACHE_FRAG_TYPE_NONE,
- bool overwrite = false,
+ int options = 0,
time_t pin_in_cache = (time_t) 0,
char *hostname = 0, int host_len = 0);
inkcoreapi Action *remove(Continuation *cont, CacheKey *key,
Modified: incubator/trafficserver/traffic/trunk/iocore/cache/P_Cache.h
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/trunk/iocore/cache/P_Cache.h?rev=919125&r1=919124&r2=919125&view=diff
==============================================================================
--- incubator/trafficserver/traffic/trunk/iocore/cache/P_Cache.h (original)
+++ incubator/trafficserver/traffic/trunk/iocore/cache/P_Cache.h Thu Mar 4 18:53:09 2010
@@ -44,6 +44,5 @@
#include "P_CacheInternal.h"
#include "P_CacheHosting.h"
#include "P_CacheHttp.h"
-#include "P_CacheTest.h"
#include "NewCacheVC.h"
#endif /* _P_CACHE_H */
Modified: incubator/trafficserver/traffic/trunk/iocore/cache/P_CacheInternal.h
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/trunk/iocore/cache/P_CacheInternal.h?rev=919125&r1=919124&r2=919125&view=diff
==============================================================================
--- incubator/trafficserver/traffic/trunk/iocore/cache/P_CacheInternal.h (original)
+++ incubator/trafficserver/traffic/trunk/iocore/cache/P_CacheInternal.h Thu Mar 4 18:53:09 2010
@@ -214,6 +214,7 @@
extern int cache_config_read_while_writer;
extern char cache_system_config_directory[PATH_NAME_MAX + 1];
extern int cache_clustering_enabled;
+extern int cache_config_agg_write_backlog;
#ifdef HIT_EVACUATE
extern int cache_config_hit_evacuate_percent;
extern int cache_config_hit_evacuate_size_limit;
@@ -243,10 +244,13 @@
}
int get_header(void **ptr, int *len)
{
- Doc *doc = (Doc*)first_buf->data();
- *ptr = doc->hdr();
- *len = doc->hlen;
- return 0;
+ if (first_buf.m_ptr) {
+ Doc *doc = (Doc*)first_buf->data();
+ *ptr = doc->hdr();
+ *len = doc->hlen;
+ return 0;
+ } else
+ return -1;
}
int set_header(void *ptr, int len)
{
@@ -269,6 +273,7 @@
int do_write_call();
int do_write_lock();
int do_write_lock_call();
+ int do_sync(inku32 target_write_serial);
int openReadClose(int event, Event *e);
int openReadReadDone(int event, Event *e);
@@ -398,6 +403,7 @@
int frag_len; // for communicating with agg_copy
inku32 write_len; // for communicating with agg_copy
inku32 agg_len; // for communicating with aggWrite
+ inku32 write_serial; // serial of the final write for SYNC
Frag *frag; // arraylist of fragment offset
Frag integral_frags[INTEGRAL_FRAGS];
Part *part;
@@ -435,13 +441,12 @@
{
unsigned int use_first_key:1;
unsigned int overwrite:1; // overwrite first_key Dir if it exists
+ unsigned int close_complete:1; // WRITE_COMPLETE is final
+ unsigned int sync:1; // write to be committed to durable storage before WRITE_COMPLETE
unsigned int evacuator:1;
unsigned int single_fragment:1;
unsigned int evac_vector:1;
unsigned int lookup:1;
-#ifdef HIT_EVACUATE
- unsigned int hit_evacuate:1;
-#endif
unsigned int update:1;
unsigned int remove:1;
unsigned int remove_aborted_writers:1;
@@ -451,6 +456,9 @@
unsigned int not_from_ram_cache:1; // entire doc was from ram cache
unsigned int rewrite_resident_alt:1;
unsigned int readers:1;
+#ifdef HIT_EVACUATE
+ unsigned int hit_evacuate:1;
+#endif
} f;
};
//end region C
@@ -923,7 +931,7 @@
Action *lookup(Continuation *cont, CacheKey *key, CacheFragType type, char *hostname, int host_len);
inkcoreapi Action *open_read(Continuation *cont, CacheKey *key, CacheFragType type, char *hostname, int len);
inkcoreapi Action *open_write(Continuation *cont, CacheKey *key,
- CacheFragType frag_type, bool overwrite = false,
+ CacheFragType frag_type, int options = 0,
time_t pin_in_cache = (time_t) 0, char *hostname = 0, int host_len = 0);
inkcoreapi Action *remove(Continuation *cont, CacheKey *key,
CacheFragType type = CACHE_FRAG_TYPE_HTTP,
@@ -1109,7 +1117,7 @@
inline inkcoreapi Action *
CacheProcessor::open_write(Continuation *cont, CacheKey *key, CacheFragType frag_type,
- int expected_size, bool overwrite, time_t pin_in_cache,
+ int expected_size, int options, time_t pin_in_cache,
char *hostname, int host_len)
{
(void) expected_size;
@@ -1118,27 +1126,24 @@
if (m && (cache_clustering_enabled > 0)) {
return Cluster_write(cont, expected_size, (MIOBuffer *) 0, m,
- key, frag_type, overwrite, pin_in_cache,
+ key, frag_type, options, pin_in_cache,
CACHE_OPEN_WRITE, key, (CacheURL *) 0,
(CacheHTTPHdr *) 0, (CacheHTTPInfo *) 0, hostname, host_len);
}
#endif
- return caches[frag_type]->open_write(cont, key, frag_type, overwrite, pin_in_cache, hostname, host_len);
-
+ return caches[frag_type]->open_write(cont, key, frag_type, options, pin_in_cache, hostname, host_len);
}
inline Action *
-CacheProcessor::open_write_buffer(Continuation *cont, MIOBuffer *buf,
- CacheKey *key,
- CacheFragType frag_type,
- bool overwrite, time_t pin_in_cache,
+CacheProcessor::open_write_buffer(Continuation *cont, MIOBuffer *buf, CacheKey *key,
+ CacheFragType frag_type, int options, time_t pin_in_cache,
char *hostname, int host_len)
{
(void)cont;
(void)buf;
(void)key;
(void)frag_type;
- (void)overwrite;
+ (void)options;
(void)hostname;
(void)host_len;
ink_assert(!"implemented");
Modified: incubator/trafficserver/traffic/trunk/iocore/cache/P_CachePart.h
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/trunk/iocore/cache/P_CachePart.h?rev=919125&r1=919124&r2=919125&view=diff
==============================================================================
--- incubator/trafficserver/traffic/trunk/iocore/cache/P_CachePart.h (original)
+++ incubator/trafficserver/traffic/trunk/iocore/cache/P_CachePart.h Thu Mar 4 18:53:09 2010
@@ -150,6 +150,7 @@
Queue<CacheVC, Continuation::Link_link> agg;
Queue<CacheVC, Continuation::Link_link> stat_cache_vcs;
+ Queue<CacheVC, Continuation::Link_link> sync;
char *agg_buffer;
int agg_todo_size;
int agg_buf_pos;
@@ -171,11 +172,11 @@
inku32 last_sync_serial;
inku32 last_write_serial;
bool recover_wrapped;
- int dir_sync_waiting;
- int dir_sync_in_progress;
+ bool dir_sync_waiting;
+ bool dir_sync_in_progress;
+ bool writing_end_marker;
RamCacheEntry first_fragment;
-
void cancel_trigger();
int open_write(CacheVC *cont, int allow_if_writers, int max_writers);
@@ -240,11 +241,11 @@
EvacuationBlock *force_evacuate_head(Dir *dir, int pinned);
int within_hit_evacuate_window(Dir *dir);
-Part():Continuation(new_ProxyMutex()), path(NULL), fd(-1),
- dir(0), buckets(0), recover_pos(0), prev_recover_pos(0), scan_pos(0), skip(0), start(0),
- len(0), data_blocks(0), hit_evacuate_window(0), agg_todo_size(0), agg_buf_pos(0), trigger(0),
- evacuate_size(0), disk(NULL), last_sync_serial(0), last_write_serial(0), recover_wrapped(false),
- dir_sync_waiting(0), dir_sync_in_progress(0) {
+ Part():Continuation(new_ProxyMutex()), path(NULL), fd(-1),
+ dir(0), buckets(0), recover_pos(0), prev_recover_pos(0), scan_pos(0), skip(0), start(0),
+ len(0), data_blocks(0), hit_evacuate_window(0), agg_todo_size(0), agg_buf_pos(0), trigger(0),
+ evacuate_size(0), disk(NULL), last_sync_serial(0), last_write_serial(0), recover_wrapped(false),
+ dir_sync_waiting(0), dir_sync_in_progress(0), writing_end_marker(0) {
open_dir.mutex = mutex;
#if defined(_WIN32)
agg_buffer = (char *) malloc(AGG_SIZE);
Modified: incubator/trafficserver/traffic/trunk/iocore/cache/P_CacheTest.h
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/trunk/iocore/cache/P_CacheTest.h?rev=919125&r1=919124&r2=919125&view=diff
==============================================================================
--- incubator/trafficserver/traffic/trunk/iocore/cache/P_CacheTest.h (original)
+++ incubator/trafficserver/traffic/trunk/iocore/cache/P_CacheTest.h Thu Mar 4 18:53:09 2010
@@ -63,9 +63,12 @@
xprev_host_prob(0), xnext_host_prob(0) {}
};
-struct CacheTestSM;
+struct CacheTestHeader {
+ inku64 serial;
+};
struct CacheTestSM : RegressionSM {
+ int start_memcpy_on_clone; // place all variables to be copied between these markers
Action *timeout;
Action *cache_action;
ink_hrtime start_time;
@@ -84,18 +87,9 @@
int expect_event;
int expect_initial_event;
int initial_event;
- union
- {
- unsigned int flags;
- struct
- {
- unsigned int http_request:1;
- unsigned int writing:1;
- unsigned int update:1;
- unsigned int hit:1;
- unsigned int remove:1;
- } f;
- };
+ inku64 content_salt;
+ CacheTestHeader header;
+ int end_memcpy_on_clone; // place all variables to be copied between these markers
void fill_buffer();
int check_buffer();
@@ -107,6 +101,8 @@
make_request_internal();
}
virtual void make_request_internal() = 0;
+ virtual int open_read_callout();
+ virtual int open_write_callout();
void cancel_timeout() {
if (timeout) timeout->cancel();
Modified: incubator/trafficserver/traffic/trunk/iocore/cluster/P_ClusterInline.h
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/trunk/iocore/cluster/P_ClusterInline.h?rev=919125&r1=919124&r2=919125&view=diff
==============================================================================
--- incubator/trafficserver/traffic/trunk/iocore/cluster/P_ClusterInline.h (original)
+++ incubator/trafficserver/traffic/trunk/iocore/cluster/P_ClusterInline.h Thu Mar 4 18:53:09 2010
@@ -169,7 +169,7 @@
inline Action *
Cluster_write(Continuation * cont, int expected_size,
MIOBuffer * buf, ClusterMachine * m,
- INK_MD5 * url_md5, CacheFragType ft, bool overwrite,
+ INK_MD5 * url_md5, CacheFragType ft, int options,
time_t pin_in_cache, int opcode,
CacheKey * key, CacheURL * url,
CacheHTTPHdr * request, CacheHTTPInfo * old_info, char *hostname, int host_len)
@@ -258,7 +258,7 @@
writeArgs.url_md5 = url_md5;
writeArgs.pin_in_cache = pin_in_cache;
writeArgs.frag_type = ft;
- writeArgs.cfl_flags |= (overwrite ? CFL_OVERWRITE_ON_WRITE : 0);
+ writeArgs.cfl_flags |= (options & CACHE_WRITE_OPT_OVERWRITE ? CFL_OVERWRITE_ON_WRITE : 0);
writeArgs.cfl_flags |= (old_info ? CFL_LOPENWRITE_HAVE_OLDINFO : 0);
writeArgs.cfl_flags |= (allow_multiple_writes ? CFL_ALLOW_MULTIPLE_WRITES : 0);