You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by jp...@apache.org on 2009/12/18 17:48:16 UTC
svn commit: r892310 [1/4] - in
/incubator/trafficserver/traffic/branches/dev: ./ iocore/aio/ iocore/cache/
iocore/cluster/ iocore/eventsystem/ iocore/net/ libinktomi++/ proxy/
proxy/http2/
Author: jplevyak
Date: Fri Dec 18 16:47:37 2009
New Revision: 892310
URL: http://svn.apache.org/viewvc?rev=892310&view=rev
Log:
TS-46: increase cache partition size to avoid write-aggregation seeks
Also drops the cache partition lock during callbacks to the user to reduce
contention on that lock.
Removed:
incubator/trafficserver/traffic/branches/dev/iocore/cache/cache_test.config
incubator/trafficserver/traffic/branches/dev/proxy/cache_test.config
Modified:
incubator/trafficserver/traffic/branches/dev/ (props changed)
incubator/trafficserver/traffic/branches/dev/iocore/aio/AIO.cc
incubator/trafficserver/traffic/branches/dev/iocore/cache/Cache.cc
incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheDir.cc
incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheDisk.cc
incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheHttp.cc
incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheLink.cc
incubator/trafficserver/traffic/branches/dev/iocore/cache/CachePages.cc
incubator/trafficserver/traffic/branches/dev/iocore/cache/CachePagesInternal.cc
incubator/trafficserver/traffic/branches/dev/iocore/cache/CachePart.cc
incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheRead.cc
incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheWrite.cc
incubator/trafficserver/traffic/branches/dev/iocore/cache/I_Cache.h
incubator/trafficserver/traffic/branches/dev/iocore/cache/I_CacheDefs.h
incubator/trafficserver/traffic/branches/dev/iocore/cache/NewCacheVC.h
incubator/trafficserver/traffic/branches/dev/iocore/cache/P_CacheDir.h
incubator/trafficserver/traffic/branches/dev/iocore/cache/P_CacheDisk.h
incubator/trafficserver/traffic/branches/dev/iocore/cache/P_CacheHttp.h
incubator/trafficserver/traffic/branches/dev/iocore/cache/P_CacheInternal.h
incubator/trafficserver/traffic/branches/dev/iocore/cache/P_CachePart.h
incubator/trafficserver/traffic/branches/dev/iocore/cache/P_RamCache.h
incubator/trafficserver/traffic/branches/dev/iocore/cache/RamCache.cc
incubator/trafficserver/traffic/branches/dev/iocore/cluster/ClusterCache.cc
incubator/trafficserver/traffic/branches/dev/iocore/cluster/ClusterConfig.cc
incubator/trafficserver/traffic/branches/dev/iocore/cluster/ClusterHandler.cc
incubator/trafficserver/traffic/branches/dev/iocore/cluster/ClusterHandlerBase.cc
incubator/trafficserver/traffic/branches/dev/iocore/cluster/ClusterProcessor.cc
incubator/trafficserver/traffic/branches/dev/iocore/cluster/ClusterVConnection.cc
incubator/trafficserver/traffic/branches/dev/iocore/cluster/P_ClusterCache.h
incubator/trafficserver/traffic/branches/dev/iocore/cluster/P_ClusterCacheInternal.h
incubator/trafficserver/traffic/branches/dev/iocore/eventsystem/I_EThread.h
incubator/trafficserver/traffic/branches/dev/iocore/eventsystem/I_IOBuffer.h
incubator/trafficserver/traffic/branches/dev/iocore/eventsystem/I_VConnection.h
incubator/trafficserver/traffic/branches/dev/iocore/eventsystem/P_IOBuffer.h
incubator/trafficserver/traffic/branches/dev/iocore/net/P_UnixNetVConnection.h
incubator/trafficserver/traffic/branches/dev/iocore/net/UnixNetVConnection.cc
incubator/trafficserver/traffic/branches/dev/libinktomi++/ink_port.h
incubator/trafficserver/traffic/branches/dev/libinktomi++/ink_queue.cc
incubator/trafficserver/traffic/branches/dev/libinktomi++/ink_queue.h
incubator/trafficserver/traffic/branches/dev/libinktomi++/ink_queue_utils.c
incubator/trafficserver/traffic/branches/dev/proxy/ICP.cc
incubator/trafficserver/traffic/branches/dev/proxy/Main.cc
incubator/trafficserver/traffic/branches/dev/proxy/http2/HttpSM.cc
incubator/trafficserver/traffic/branches/dev/proxy/http2/HttpTransact.cc
Propchange: incubator/trafficserver/traffic/branches/dev/
------------------------------------------------------------------------------
svn:mergeinfo = /incubator/trafficserver/traffic/trunk:891822-892301
Modified: incubator/trafficserver/traffic/branches/dev/iocore/aio/AIO.cc
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/branches/dev/iocore/aio/AIO.cc?rev=892310&r1=892309&r2=892310&view=diff
==============================================================================
--- incubator/trafficserver/traffic/branches/dev/iocore/aio/AIO.cc (original)
+++ incubator/trafficserver/traffic/branches/dev/iocore/aio/AIO.cc Fri Dec 18 16:47:37 2009
@@ -30,25 +30,24 @@
****************************************************************************/
#include "P_AIO.h"
-// globals
-int ts_config_with_inkdiskio = 0;
-#define MAX_DISKS_POSSIBLE 100
-#define SLEEP_TIME 100
-
#define MAX_DISKS_POSSIBLE 100
#define SLEEP_TIME 100
+// globals
+
+int ts_config_with_inkdiskio = 0;
/* structure to hold information about each file descriptor */
AIO_Reqs *aio_reqs[MAX_DISKS_POSSIBLE];
-
+/* number of unique file descriptors in the aio_reqs array */
+volatile int num_filedes = 0;
RecRawStatBlock *aio_rsb = NULL;
// acquire this mutex before inserting a new entry in the aio_reqs array.
// Don't need to acquire this for searching the array
static ink_mutex insert_mutex;
-
RecInt cache_config_threads_per_disk = 12;
static RecInt cache_config_aio_sleep_time = SLEEP_TIME;
+Continuation *aio_err_callbck = 0;
// AIO Stats
inku64 aio_num_read = 0;
@@ -56,13 +55,7 @@
inku64 aio_num_write = 0;
inku64 aio_bytes_written = 0;
-
-
-/* number of unique file descriptors in the aio_reqs array */
-volatile int num_filedes = 0;
-
-Continuation *aio_err_callbck = 0;
-
+static void aio_move(AIO_Reqs * req);
//////////////////////////////////////////////////////////////////////////
/////////////// INKIO /////////////////
@@ -193,18 +186,6 @@
inkaio_submit(get_kcb(this_ethread()));
}
-#if 0
-inline void
-print_thread_counters(Event * e, AIO_Reqs * req)
-{
- Warning
- ("Thread[%d] aio_pending: %d, aio_queued: %d, pending_size: %d, max_aio_queued: %d\naioread[%d] aioread_done[%d] aiowrite[%d] aiowritedone[%d] events_len[%d] events_len_done[%d]",
- e->ethread->id, req->pending, req->queued, pending_size, max_aio_queued, (get_kcb(e->ethread))->aioread,
- (get_kcb(e->ethread))->aioread_done, (get_kcb(e->ethread))->aiowrite, (get_kcb(e->ethread))->aiowrite_done,
- (get_kcb(e->ethread))->events_len, (get_kcb(e->ethread))->events_len_done);
-}
-#endif
-static void aio_move(AIO_Reqs * req);
int
DiskWriteMonitor::monitor_main(int event, Event * e)
{
@@ -550,10 +531,8 @@
{
AIOCallback *next = NULL, *prev = NULL, *cb = (AIOCallback *) ink_atomiclist_popall(&req->aio_temp_list);
/* flip the list */
-
if (!cb)
return;
-
while (cb->link.next) {
next = (AIOCallback *) cb->link.next;
cb->link.next = prev;
@@ -562,7 +541,6 @@
}
/* fix the last pointer */
cb->link.next = prev;
-
for (; cb; cb = next) {
next = (AIOCallback *) cb->link.next;
cb->link.next = NULL;
@@ -602,19 +580,16 @@
/* search for the matching file descriptor */
for (; thread_ndx < num_filedes; thread_ndx++) {
if (aio_reqs[thread_ndx]->filedes == op->aiocb.aio_fildes) {
-
/* found the matching file descriptor */
req = aio_reqs[thread_ndx];
break;
}
}
-
if (!req) {
ink_mutex_acquire(&insert_mutex);
if (thread_ndx == num_filedes) {
/* insert a new entry */
req = aio_init_fildes(op->aiocb.aio_fildes);
-
} else {
/* a new entry was inserted between the time we checked the
aio_reqs and acquired the mutex. check the aio_reqs array to
@@ -626,13 +601,10 @@
break;
}
}
- if (!req) {
+ if (!req)
req = aio_init_fildes(op->aiocb.aio_fildes);
- }
-
}
ink_mutex_release(&insert_mutex);
-
}
op->aio_req = req;
}
@@ -653,12 +625,10 @@
#ifdef AIO_STATS
ink_atomic_increment(&data->num_queue, 1);
#endif
- if (!INK_ATOMICLIST_EMPTY(req->aio_temp_list)) {
+ if (!INK_ATOMICLIST_EMPTY(req->aio_temp_list))
aio_move(req);
- }
/* now put the new request */
aio_insert(op, req);
-
ink_cond_signal(&req->aio_cond);
ink_mutex_release(&req->aio_mutex);
}
@@ -784,22 +754,16 @@
for (;;) {
do {
current_req = my_aio_req;
-
/* check if any pending requests on the atomic list */
-
- if (!INK_ATOMICLIST_EMPTY(my_aio_req->aio_temp_list)) {
+ if (!INK_ATOMICLIST_EMPTY(my_aio_req->aio_temp_list))
aio_move(my_aio_req);
- }
-
if (!(op = my_aio_req->aio_todo.pop()) && !(op = my_aio_req->http_aio_todo.pop()))
break;
-
#ifdef AIO_STATS
num_requests--;
current_req->queued--;
ink_atomic_increment((int *) ¤t_req->pending, 1);
#endif
-
// update the stats;
if (op->aiocb.aio_lio_opcode == LIO_WRITE) {
aio_num_write++;
@@ -842,19 +806,12 @@
read.
*/
if (op->aiocb.aio_lio_opcode == LIO_WRITE) {
- MUTEX_TRY_LOCK(lock, op->mutex, thr_info->mutex->thread_holding);
- if (!lock) {
- eventProcessor.schedule_imm(op);
- } else {
- if (!op->action.cancelled)
- op->action.continuation->handleEvent(AIO_EVENT_DONE, op);
- }
- } else {
- eventProcessor.schedule_imm(op);
-
- }
+ MUTEX_LOCK(lock, op->mutex, thr_info->mutex->thread_holding);
+ if (!op->action.cancelled)
+ op->action.continuation->handleEvent(AIO_EVENT_DONE, op);
+ } else
+ op->thread->schedule_imm(op);
ink_mutex_acquire(&my_aio_req->aio_mutex);
-
} while (1);
if (timed_wait) {
timespec ts = ink_based_hrtime_to_timespec(ink_get_hrtime() + HRTIME_MSECONDS(cache_config_aio_sleep_time));
Modified: incubator/trafficserver/traffic/branches/dev/iocore/cache/Cache.cc
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/branches/dev/iocore/cache/Cache.cc?rev=892310&r1=892309&r2=892310&view=diff
==============================================================================
--- incubator/trafficserver/traffic/branches/dev/iocore/cache/Cache.cc (original)
+++ incubator/trafficserver/traffic/branches/dev/iocore/cache/Cache.cc Fri Dec 18 16:47:37 2009
@@ -34,27 +34,24 @@
#ifdef HTTP_CACHE
#include "HttpTransactCache.h"
#endif
-// Compilation Options
#include "InkAPIInternal.h"
#include <HttpSM.h>
#include <HttpCacheSM.h>
+// Compilation Options
+
#define USELESS_REENABLES // allow them for now
-// #define USE_CACHE_OPEN_READ_DONE
// #define VERIFY_JTEST_DATA
-// Defines
-#define MAX_RECOVER_BYTES (1024 * 1024)
-
#define DOCACHE_CLEAR_DYN_STAT(x) \
do { \
RecSetRawStatSum(rsb, x, 0); \
RecSetRawStatCount(rsb, x, 0); \
} while (0);
+
// Configuration
-int cache_config_sem_key = 31717;
ink64 cache_config_ram_cache_size = AUTO_SIZE_RAM_CACHE;
int cache_config_http_max_alts = 3;
int cache_config_dir_sync_frequency = 60;
@@ -65,9 +62,7 @@
int cache_config_min_average_object_size = ESTIMATED_OBJECT_SIZE;
ink64 cache_config_ram_cache_cutoff = 1048576; // 1 MB
ink64 cache_config_ram_cache_mixt_cutoff = 1048576; // 1 MB
-int cache_config_max_agg_delay = 1000;
int cache_config_max_disk_errors = 5;
-int cache_config_check_disk_idle = 1;
int cache_config_agg_write_backlog = 5242880;
#ifdef HIT_EVACUATE
int cache_config_hit_evacuate_percent = 10;
@@ -95,16 +90,14 @@
int CacheProcessor::start_internal_flags = 0;
int CacheProcessor::auto_clear_flag = 0;
CacheProcessor cacheProcessor;
-Part ** gpart = NULL;
+Part **gpart = NULL;
volatile int gnpart = 0;
ClassAllocator<CacheVC> cacheVConnectionAllocator("cacheVConnection");
ClassAllocator<NewCacheVC> newCacheVConnectionAllocator("newCacheVConnection");
ClassAllocator<EvacuationBlock> evacuationBlockAllocator("evacuationBlock");
ClassAllocator<CacheRemoveCont> cacheRemoveContAllocator("cacheRemoveCont");
ClassAllocator<EvacuationKey> evacuationKeyAllocator("evacuationKey");
-
int CacheVC::size_to_init = -1;
-
CacheKey zero_key(0, 0);
struct PartInitInfo
@@ -113,13 +106,13 @@
AIOCallbackInternal part_aio[4];
char *part_h_f;
- PartInitInfo()
+ PartInitInfo()
{
recover_pos = 0;
if ((part_h_f = (char *) valloc(4 * INK_BLOCK_SIZE)) != NULL)
memset(part_h_f, 0, 4 * INK_BLOCK_SIZE);
}
- ~PartInitInfo()
+ ~PartInitInfo()
{
for (int i = 0; i < 4; i++) {
part_aio[i].action = NULL;
@@ -149,7 +142,7 @@
if (!gpart[i]->header->cycle)
used += gpart[i]->header->write_pos - gpart[i]->start;
else
- used += gpart[i]->len - part_dirlen(gpart[i]) - EVAC_SIZE;
+ used += gpart[i]->len - part_dirlen(gpart[i]) - EVACUATION_SIZE;
}
}
return used;
@@ -160,7 +153,7 @@
{
ink64 total = 0;
for (int i = 0; i < gnpart; i++)
- total += gpart[i]->len - part_dirlen(gpart[i]) - EVAC_SIZE;
+ total += gpart[i]->len - part_dirlen(gpart[i]) - EVACUATION_SIZE;
return total;
}
@@ -224,6 +217,22 @@
}
VIO *
+CacheVC::do_io_pread(Continuation * c, ink64 nbytes, MIOBuffer * abuf, ink_off_t off)
+{
+ ink_assert(vio.op == VIO::READ);
+ vio.buffer.writer_for(abuf);
+ vio.set_continuation(c);
+ vio.ndone = off;
+ vio.nbytes = 0;
+ vio.vc_server = this;
+ seek_to = off;
+ ink_assert(c->mutex->thread_holding);
+ if (!trigger)
+ trigger = c->mutex->thread_holding->schedule_imm_local(this);
+ return &vio;
+}
+
+VIO *
CacheVC::do_io_write(Continuation * c, int nbytes, IOBufferReader * abuf, bool owner)
{
ink_assert(vio.op == VIO::WRITE);
@@ -386,7 +395,7 @@
ink_assert(!cont->stat_link.next && !cont->stat_link.prev);
stat_cache_vcs.enqueue(cont, cont->stat_link);
#endif
- if (cont->f.single_segment)
+ if (cont->f.single_fragment)
return 0;
EThread *t = cont->mutex->thread_holding;
int i = dir_evac_bucket(&cont->earliest_dir);
@@ -481,6 +490,9 @@
#ifdef O_DIRECT
opts |= O_DIRECT;
#endif
+#ifdef O_DSYNC
+ opts |= O_DSYNC;
+#endif
int fd = ink_open(path, opts, 0644);
int blocks = sd->blocks;
@@ -731,7 +743,7 @@
CACHE_PART_SUM_DYN_STAT(cache_bytes_total_stat, part_total_cache_bytes);
- part_total_direntries = gpart[i]->buckets * DIR_SEGMENTS * DIR_DEPTH;
+ part_total_direntries = gpart[i]->buckets * gpart[i]->segments * DIR_DEPTH;
total_direntries += part_total_direntries;
CACHE_PART_SUM_DYN_STAT(cache_direntries_total_stat, part_total_direntries);
@@ -791,7 +803,7 @@
Debug("cache_init", "CacheProcessor::cacheInitialized - total_cache_bytes = %lld = %lldMb",
total_cache_bytes, total_cache_bytes / (1024 * 1024));
- part_total_direntries = gpart[i]->buckets * DIR_SEGMENTS * DIR_DEPTH;
+ part_total_direntries = gpart[i]->buckets * gpart[i]->segments * DIR_DEPTH;
total_direntries += part_total_direntries;
CACHE_PART_SUM_DYN_STAT(cache_direntries_total_stat, part_total_direntries);
@@ -808,7 +820,6 @@
GLOBAL_CACHE_SET_DYN_STAT(cache_direntries_total_stat, total_direntries);
GLOBAL_CACHE_SET_DYN_STAT(cache_direntries_used_stat, used_direntries);
dir_sync_init();
-// dir_compute_stats();
cache_init_ok = 1;
} else
Warning("cache unable to open any parts, disabled");
@@ -864,18 +875,27 @@
tt[strlen(tt) - 1] = 0;
printf(" Create Time: %s\n", tt);
printf(" Sync Serial: %u\n", (int) header->sync_serial);
- printf(" Write Serial: %u\n", (int) header->write_serial);
+ printf(" Write Serial: %u\n", (int) header->write_serial);
printf("\n");
+
return 0;
}
static void
-part_init_data(Part * d)
+part_init_data_internal(Part * d)
{
- d->buckets = ((d->len - (d->start - d->skip))
- / cache_config_min_average_object_size) / DIR_DEPTH;
- d->buckets = ((d->buckets + DIR_SEGMENTS - 1) / DIR_SEGMENTS);
- d->start = d->skip + 2 * part_dirlen(d) + part_metalen(d);
+ d->buckets = ((d->len - (d->start - d->skip)) / cache_config_min_average_object_size) / DIR_DEPTH;
+ d->segments = (d->buckets + (((1<<16)-1)/DIR_DEPTH)) / ((1<<16)/DIR_DEPTH);
+ d->buckets = (d->buckets + d->segments - 1) / d->segments;
+ d->start = d->skip + 2 * part_dirlen(d);
+}
+
+static void
+part_init_data(Part * d) {
+ // iteratively calculate start + buckets
+ part_init_data_internal(d);
+ part_init_data_internal(d);
+ part_init_data_internal(d);
}
void
@@ -883,13 +903,13 @@
{
int b, s, l;
- for (s = 0; s < DIR_SEGMENTS; s++) {
+ for (s = 0; s < d->segments; s++) {
d->header->freelist[s] = 0;
Dir *seg = dir_segment(s, d);
for (l = 1; l < DIR_DEPTH; l++) {
for (b = 0; b < d->buckets; b++) {
Dir *bucket = dir_bucket(b, seg);
- dir_free_entry(&bucket[l], s, d);
+ dir_free_entry(dir_bucket_row(bucket, l), s, d);
}
}
}
@@ -923,11 +943,6 @@
Warning("unable to clear cache directory '%s'", d->hash_id);
return -1;
}
- if (pwrite(d->fd, d->raw_dir, ROUND_TO_BLOCK(sizeof(PartHeaderFooter)), d->skip + dir_len) < 0) {
- Warning("unable to clear cache directory '%s'", d->hash_id);
- return -1;
- }
-
return 0;
}
@@ -971,8 +986,6 @@
// successive approximation, directory/meta data eats up some storage
start = dir_skip;
part_init_data(this);
- part_init_data(this);
- part_init_data(this);
data_blocks = (len - (start - skip)) / INK_BLOCK_SIZE;
#ifdef HIT_EVACUATE
hit_evacuate_window = (data_blocks * cache_config_hit_evacuate_percent) / 100;
@@ -999,11 +1012,9 @@
raw_dir = (char *) (((unsigned int) ((char *) (raw_dir) + (alignment - 1))) & ~(alignment - 1));
#endif
- dir = (Dir *) (raw_dir + ROUND_TO_BLOCK(sizeof(PartHeaderFooter)));
+ dir = (Dir *) (raw_dir + part_headerlen(this));
header = (PartHeaderFooter *) raw_dir;
footer = (PartHeaderFooter *) (raw_dir + part_dirlen(this) - ROUND_TO_BLOCK(sizeof(PartHeaderFooter)));
- for (i = 0; i < DIR_SEGMENTS; i++)
- segment[i] = part_dir_segment(this, i);
if (clear) {
Note("clearing cache directory '%s'", hash_id);
@@ -1011,8 +1022,8 @@
}
init_info = new PartInitInfo();
- int dir_len = ROUND_TO_BLOCK(sizeof(PartHeaderFooter));
- ink_off_t footer_offset = part_dirlen(this) - dir_len;
+ int footerlen = ROUND_TO_BLOCK(sizeof(PartHeaderFooter));
+ ink_off_t footer_offset = part_dirlen(this) - footerlen;
// try A
ink_off_t as = skip;
if (is_debug_tag_set("cache_init"))
@@ -1021,7 +1032,6 @@
init_info->part_aio[0].aiocb.aio_offset = as;
init_info->part_aio[1].aiocb.aio_offset = as + footer_offset;
ink_off_t bs = skip + part_dirlen(this);
-
init_info->part_aio[2].aiocb.aio_offset = bs;
init_info->part_aio[3].aiocb.aio_offset = bs + footer_offset;
@@ -1030,7 +1040,7 @@
aio->aiocb.aio_fildes = fd;
aio->aiocb.aio_reqprio = 0;
aio->aiocb.aio_buf = &(init_info->part_h_f[i * INK_BLOCK_SIZE]);
- aio->aiocb.aio_nbytes = dir_len;
+ aio->aiocb.aio_nbytes = footerlen;
aio->action = this;
aio->thread = this_ethread();
aio->then = (i < 3) ? &(init_info->part_aio[i + 1]) : 0;
@@ -1089,9 +1099,7 @@
clear_dir();
return EVENT_DONE;
}
-#ifdef DEBUG
- check_dir(this);
-#endif
+ CHECK_DIR(this);
SET_HANDLER(&Part::handle_recover_from_data);
return handle_recover_from_data(EVENT_IMMEDIATE, 0);
@@ -1156,11 +1164,11 @@
recover_pos = start;
}
#if defined(_WIN32)
- io.aiocb.aio_buf = (char *) malloc(MAX_RECOVER_BYTES);
+ io.aiocb.aio_buf = (char *) malloc(RECOVERY_SIZE);
#else
- io.aiocb.aio_buf = (char *) valloc(MAX_RECOVER_BYTES);
+ io.aiocb.aio_buf = (char *) valloc(RECOVERY_SIZE);
#endif
- io.aiocb.aio_nbytes = MAX_RECOVER_BYTES;
+ io.aiocb.aio_nbytes = RECOVERY_SIZE;
if ((ink_off_t)(recover_pos + io.aiocb.aio_nbytes) > (ink_off_t)(skip + len))
io.aiocb.aio_nbytes = (skip + len) - recover_pos;
} else if (event == AIO_EVENT_DONE) {
@@ -1213,7 +1221,7 @@
if (recover_wrapped && start == io.aiocb.aio_offset) {
doc = (Doc *) s;
if (doc->magic != DOC_MAGIC || doc->write_serial < last_write_serial) {
- recover_pos = skip + len - EVAC_SIZE;
+ recover_pos = skip + len - EVACUATION_SIZE;
goto Ldone;
}
}
@@ -1266,7 +1274,7 @@
else if (recover_pos - (e - s) > (skip + len) - AGG_SIZE) {
recover_wrapped = 1;
recover_pos = start;
- io.aiocb.aio_nbytes = MAX_RECOVER_BYTES;
+ io.aiocb.aio_nbytes = RECOVERY_SIZE;
break;
}
@@ -1281,7 +1289,7 @@
if (recover_pos > (skip + len) - AGG_SIZE) {
recover_wrapped = 1;
recover_pos = start;
- io.aiocb.aio_nbytes = MAX_RECOVER_BYTES;
+ io.aiocb.aio_nbytes = RECOVERY_SIZE;
break;
}
@@ -1295,7 +1303,7 @@
s += round_to_approx_size(doc->len);
}
- /* if (s > e) then we gone through MAX_RECOVER_BYTES; we need to
+ /* if (s > e) then we gone through RECOVERY_SIZE; we need to
read more data off disk and continue recovering */
if (s >= e) {
/* In the last iteration, we increment s by doc->len...need to undo
@@ -1305,7 +1313,7 @@
recover_pos -= e - s;
if (recover_pos >= skip + len)
recover_pos = start;
- io.aiocb.aio_nbytes = MAX_RECOVER_BYTES;
+ io.aiocb.aio_nbytes = RECOVERY_SIZE;
if ((ink_off_t)(recover_pos + io.aiocb.aio_nbytes) > (ink_off_t)(skip + len))
io.aiocb.aio_nbytes = (skip + len) - recover_pos;
}
@@ -1326,8 +1334,8 @@
return handle_recover_write_dir(EVENT_IMMEDIATE, 0);
}
- recover_pos += EVAC_SIZE; // safely cover the max write size
- if (recover_pos < header->write_pos && (recover_pos + EVAC_SIZE >= header->write_pos)) {
+ recover_pos += EVACUATION_SIZE; // safely cover the max write size
+ if (recover_pos < header->write_pos && (recover_pos + EVACUATION_SIZE >= header->write_pos)) {
Debug("cache_init", "Head Pos: %llu, Rec Pos: %llu, Wrapped:%d", header->write_pos, recover_pos, recover_wrapped);
Warning("no valid directory found while recovering '%s', clearing", hash_id);
goto Lclear;
@@ -1341,8 +1349,8 @@
if (!(header->sync_serial & 1) == !(next_sync_serial & 1))
next_sync_serial++;
// clear effected portion of the cache
- int clear_start = offset_to_part_offset(this, header->write_pos);
- int clear_end = offset_to_part_offset(this, recover_pos);
+ ink_off_t clear_start = offset_to_part_offset(this, header->write_pos);
+ ink_off_t clear_end = offset_to_part_offset(this, recover_pos);
if (clear_start <= clear_end)
dir_clear_range(clear_start, clear_end, this);
else {
@@ -1354,25 +1362,28 @@
header->write_pos, recover_pos, header->sync_serial, next_sync_serial);
footer->sync_serial = header->sync_serial = next_sync_serial;
- for (int i = 0; i < 2; i++) {
+ for (int i = 0; i < 3; i++) {
AIOCallback *aio = &(init_info->part_aio[i]);
aio->aiocb.aio_fildes = fd;
aio->aiocb.aio_reqprio = 0;
aio->action = this;
aio->thread = this_ethread();
- aio->then = (i < 1) ? &(init_info->part_aio[i + 1]) : 0;
+ aio->then = (i < 2) ? &(init_info->part_aio[i + 1]) : 0;
}
- int headerlen = ROUND_TO_BLOCK(sizeof(PartHeaderFooter));
+ int footerlen = ROUND_TO_BLOCK(sizeof(PartHeaderFooter));
int dirlen = part_dirlen(this);
int B = header->sync_serial & 1;
ink_off_t ss = skip + (B ? dirlen : 0);
init_info->part_aio[0].aiocb.aio_buf = raw_dir;
- init_info->part_aio[0].aiocb.aio_nbytes = dirlen - headerlen;
+ init_info->part_aio[0].aiocb.aio_nbytes = footerlen;
init_info->part_aio[0].aiocb.aio_offset = ss;
- init_info->part_aio[1].aiocb.aio_buf = raw_dir + dirlen - headerlen;
- init_info->part_aio[1].aiocb.aio_nbytes = headerlen;
- init_info->part_aio[1].aiocb.aio_offset = ss + dirlen - headerlen;
+ init_info->part_aio[1].aiocb.aio_buf = raw_dir + footerlen;
+ init_info->part_aio[1].aiocb.aio_nbytes = dirlen - 2 * footerlen;
+ init_info->part_aio[1].aiocb.aio_offset = ss + footerlen;
+ init_info->part_aio[2].aiocb.aio_buf = raw_dir + dirlen - footerlen;
+ init_info->part_aio[2].aiocb.aio_nbytes = footerlen;
+ init_info->part_aio[2].aiocb.aio_offset = ss + dirlen - footerlen;
SET_HANDLER(&Part::handle_recover_write_dir);
ink_assert(ink_aio_write(init_info->part_aio));
@@ -1433,6 +1444,7 @@
io.action = this;
io.thread = this_ethread();
io.then = 0;
+
if (hf[0]->sync_serial == hf[1]->sync_serial &&
(hf[0]->sync_serial >= hf[2]->sync_serial || hf[2]->sync_serial != hf[3]->sync_serial)) {
SET_HANDLER(&Part::handle_dir_read);
@@ -1615,7 +1627,7 @@
for (p = 0; p < gnpart; p++) {
if (d->fd == gpart[p]->fd) {
- total_dir_delete += gpart[p]->buckets * DIR_SEGMENTS * DIR_DEPTH;
+ total_dir_delete += gpart[p]->buckets * gpart[p]->segments * DIR_DEPTH;
used_dir_delete += dir_entries_used(gpart[p]);
total_bytes_delete = gpart[p]->len - part_dirlen(gpart[p]);
}
@@ -1717,15 +1729,6 @@
IOCORE_EstablishStaticConfigInt32(cache_config_min_average_object_size, "proxy.config.cache.min_average_object_size");
Debug("cache_init", "Cache::open - proxy.config.cache.min_average_object_size = %ld",
(long) cache_config_min_average_object_size);
- /* To support partition sizes of upto 8G, we have to limit the
- cache_config_min_average_object_size to be atleast 4096 Bytes.
- This restriction is because we can support only upto 64K dir
- entries per segment in the Part
- */
- if (cache_config_min_average_object_size < 4096) {
- ink_release_assert(!"Fatal Error: proxy.config.cache.min_average_object_size cannot be lesser than 4096");
- }
-
CachePart *cp = cp_list.head;
for (; cp; cp = cp->link.next) {
@@ -1784,103 +1787,101 @@
NOWARN_UNUSED(e);
cancel_trigger();
ink_debug_assert(this_ethread() == mutex->thread_holding);
- if (event != AIO_EVENT_DONE && is_io_in_progress())
- return EVENT_CONT; // reenable read
-
- // aio complete OR lock retry
- set_io_not_in_progress();
- int okay = 1;
- MUTEX_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
- if (!lock)
- VC_SCHED_LOCK_RETRY();
- if ((!dir_valid(part, &dir)) || (!io.ok())) {
- if (!io.ok()) {
- Debug("cache_disk_error", "Read error on disk %s\n \
+ if (event == AIO_EVENT_DONE)
+ set_io_not_in_progress();
+ else
+ if (is_io_in_progress())
+ return EVENT_CONT;
+ {
+ MUTEX_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
+ if (!lock)
+ VC_SCHED_LOCK_RETRY();
+ if ((!dir_valid(part, &dir)) || (!io.ok())) {
+ if (!io.ok()) {
+ Debug("cache_disk_error", "Read error on disk %s\n \
read range : [%llu - %llu bytes] [%llu - %llu blocks] \n", part->hash_id, io.aiocb.aio_offset, io.aiocb.aio_offset + io.aiocb.aio_nbytes, io.aiocb.aio_offset / 512, (io.aiocb.aio_offset + io.aiocb.aio_nbytes) / 512);
+ }
+ goto Ldone;
}
- POP_HANDLER;
- return handleEvent(AIO_EVENT_DONE, 0);
- }
- ink_assert(part->mutex->nthread_holding < 1000);
- ink_assert(((Doc *) buf->data())->magic == DOC_MAGIC);
+ ink_assert(part->mutex->nthread_holding < 1000);
+ ink_assert(((Doc *) buf->data())->magic == DOC_MAGIC);
#ifdef VERIFY_JTEST_DATA
- char xx[500];
- if (read_key && *read_key == ((Doc *) buf->data())->key && request.valid() && !dir_head(&dir) && !vio.ndone) {
- int ib = 0, xd = 0;
- request.url_get()->print(xx, 500, &ib, &xd);
- char *x = xx;
- for (int q = 0; q < 3; q++)
- x = strchr(x + 1, '/');
- ink_assert(!memcmp(((Doc *) buf->data())->data(), x, ib - (x - xx)));
- }
-#endif
- Doc *doc = (Doc *) buf->data();
- // put into ram cache?
- if (io.ok() &&
- ((doc->first_key == *read_key) || (doc->key == *read_key) || STORE_COLLISION) && doc->magic == DOC_MAGIC) {
- f.not_from_ram_cache = 1;
- if (cache_config_enable_checksum && doc->checksum != DOC_NO_CHECKSUM) {
- // verify that the checksum matches
- inku32 checksum = 0;
- for (char *b = doc->hdr; b < (char *) doc + doc->len; b++)
- checksum += *b;
- ink_assert(checksum == doc->checksum);
- if (checksum != doc->checksum) {
- Note("cache: checksum error for [%llu %llu] len %d, hlen %d, disk %s, offset %llu size %d",
- doc->first_key.b[0], doc->first_key.b[1],
- doc->len, doc->hlen, part->path, io.aiocb.aio_offset, io.aiocb.aio_nbytes);
- doc->magic = DOC_CORRUPT;
- okay = 0;
- }
+ char xx[500];
+ if (read_key && *read_key == ((Doc *) buf->data())->key && request.valid() && !dir_head(&dir) && !vio.ndone) {
+ int ib = 0, xd = 0;
+ request.url_get()->print(xx, 500, &ib, &xd);
+ char *x = xx;
+ for (int q = 0; q < 3; q++)
+ x = strchr(x + 1, '/');
+ ink_assert(!memcmp(((Doc *) buf->data())->data(), x, ib - (x - xx)));
}
- // If http doc, we need to unmarshal the headers before putting
- // in the ram cache.
-#ifdef HTTP_CACHE
- if (doc->hlen > 0 && okay) {
- char *tmp = doc->hdr;
- int len = doc->hlen;
- while (len > 0) {
- int r = HTTPInfo::unmarshal(tmp, len, buf._ptr());
- if (r < 0) {
- ink_assert(!"CacheVC::handleReadDone unmarshal failed");
+#endif
+ Doc *doc = (Doc *) buf->data();
+ // put into ram cache?
+ if (io.ok() &&
+ ((doc->first_key == *read_key) || (doc->key == *read_key) || STORE_COLLISION) && doc->magic == DOC_MAGIC) {
+ int okay = 1;
+ f.not_from_ram_cache = 1;
+ if (cache_config_enable_checksum && doc->checksum != DOC_NO_CHECKSUM) {
+ // verify that the checksum matches
+ inku32 checksum = 0;
+ for (char *b = doc->hdr(); b < (char *) doc + doc->len; b++)
+ checksum += *b;
+ ink_assert(checksum == doc->checksum);
+ if (checksum != doc->checksum) {
+ Note("cache: checksum error for [%llu %llu] len %d, hlen %d, disk %s, offset %llu size %d",
+ doc->first_key.b[0], doc->first_key.b[1],
+ doc->len, doc->hlen, part->path, io.aiocb.aio_offset, io.aiocb.aio_nbytes);
+ doc->magic = DOC_CORRUPT;
okay = 0;
- break;
}
- len -= r;
- tmp += r;
}
- }
-#endif
- //Put the request in the ram cache only if its a open_read or lookup
- if (vio.op == VIO::READ && okay) {
- bool cutoff_check;
- // cutoff_check :
- // doc_len == 0 for the first fragment (it is set from the vector)
- // The decision on the first fragment is based on
- // doc->total_len
- // After that, the decision is based of doc_len (doc_len != 0)
- // (cache_config_ram_cache_cutoff == 0) : no cutoffs
- cutoff_check = ((!doc_len && doc->total_len < part->ram_cache.cutoff_size)
- || (doc_len && doc_len < part->ram_cache.cutoff_size)
- || !part->ram_cache.cutoff_size);
-
- if (cutoff_check) {
- part->ram_cache.put(read_key, buf, mutex->thread_holding, 0, dir_offset(&dir));
- } // end cutoff_check
-
- if (!doc_len) {
- // keep a pointer to it. In case the state machine decides to
- // update this document, we don't have to read it back in memory
- // again
- part->first_fragment.key = *read_key;
- part->first_fragment.auxkey1 = dir_offset(&dir);
- part->first_fragment.data = buf;
+ // If http doc, we need to unmarshal the headers before putting
+ // in the ram cache.
+#ifdef HTTP_CACHE
+ if (doc->ftype == CACHE_FRAG_TYPE_HTTP && doc->hlen && okay) {
+ char *tmp = doc->hdr();
+ int len = doc->hlen;
+ while (len > 0) {
+ int r = HTTPInfo::unmarshal(tmp, len, buf._ptr());
+ if (r < 0) {
+ ink_assert(!"CacheVC::handleReadDone unmarshal failed");
+ okay = 0;
+ break;
+ }
+ len -= r;
+ tmp += r;
+ }
}
- } // end VIO::READ check
- } // end io.ok() check
-
+#endif
+ // Put the request in the ram cache only if its a open_read or lookup
+ if (vio.op == VIO::READ && okay) {
+ bool cutoff_check;
+ // cutoff_check :
+ // doc_len == 0 for the first fragment (it is set from the vector)
+ // The decision on the first fragment is based on
+ // doc->total_len
+ // After that, the decision is based of doc_len (doc_len != 0)
+ // (cache_config_ram_cache_cutoff == 0) : no cutoffs
+ cutoff_check = ((!doc_len && doc->total_len < part->ram_cache.cutoff_size)
+ || (doc_len && doc_len < part->ram_cache.cutoff_size)
+ || !part->ram_cache.cutoff_size);
+ if (cutoff_check)
+ part->ram_cache.put(read_key, buf, mutex->thread_holding, 0, dir_offset(&dir));
+ if (!doc_len) {
+ // keep a pointer to it. In case the state machine decides to
+ // update this document, we don't have to read it back in memory
+ // again
+ part->first_fragment.key = *read_key;
+ part->first_fragment.auxkey1 = dir_offset(&dir);
+ part->first_fragment.data = buf;
+ }
+ } // end VIO::READ check
+ } // end io.ok() check
+ }
+Ldone:
POP_HANDLER;
return handleEvent(AIO_EVENT_DONE, 0);
}
@@ -1893,7 +1894,6 @@
cancel_trigger();
// check ram cache
- ink_assert(event == EVENT_NONE);
ink_debug_assert(part->mutex->thread_holding == this_ethread());
if (part->ram_cache.get(read_key, &buf, 0, dir_offset(&dir))) {
CACHE_INCREMENT_DYN_STAT(cache_ram_cache_hits_stat);
@@ -1906,26 +1906,25 @@
}
CACHE_INCREMENT_DYN_STAT(cache_ram_cache_misses_stat);
- // set ram cache hit flag to false
// see if its in the aggregation buffer
if (dir_agg_buf_valid(part, &dir)) {
int agg_offset = part_offset(part, &dir) - part->header->write_pos;
- buf = new_IOBufferData(iobuffer_size_to_index(io.aiocb.aio_nbytes), MEMALIGNED);
+ buf = new_IOBufferData(iobuffer_size_to_index(io.aiocb.aio_nbytes, MAX_BUFFER_SIZE_INDEX), MEMALIGNED);
ink_assert((agg_offset + io.aiocb.aio_nbytes) <= (unsigned) part->agg_buf_pos);
char *doc = buf->data();
char *agg = part->agg_buffer + agg_offset;
memcpy(doc, agg, io.aiocb.aio_nbytes);
io.aio_result = io.aiocb.aio_nbytes;
SET_HANDLER(&CacheVC::handleReadDone);
- return handleReadDone(AIO_EVENT_DONE, 0);
+ return EVENT_RETURN;
}
io.aiocb.aio_fildes = part->fd;
io.aiocb.aio_offset = part_offset(part, &dir);
if ((ink_off_t)(io.aiocb.aio_offset + io.aiocb.aio_nbytes) > (ink_off_t)(part->skip + part->len))
io.aiocb.aio_nbytes = part->skip + part->len - io.aiocb.aio_offset;
- buf = new_IOBufferData(iobuffer_size_to_index(io.aiocb.aio_nbytes), MEMALIGNED);
+ buf = new_IOBufferData(iobuffer_size_to_index(io.aiocb.aio_nbytes, MAX_BUFFER_SIZE_INDEX), MEMALIGNED);
io.aiocb.aio_buf = buf->data();
io.action = this;
io.thread = mutex->thread_holding;
@@ -1937,7 +1936,7 @@
LramHit:
io.aio_result = io.aiocb.aio_nbytes;
POP_HANDLER;
- return handleEvent(AIO_EVENT_DONE, 0);
+ return EVENT_RETURN; // allow the caller to release the partition lock
}
Action *
@@ -1949,20 +1948,15 @@
return ACTION_RESULT_DONE;
}
- ink_assert(this);
-
Part *part = key_to_part(key, hostname, host_len);
- CacheVC *c = NULL;
ProxyMutex *mutex = cont->mutex;
-
- c = new_CacheVC(cont);
+ CacheVC *c = new_CacheVC(cont);
SET_CONTINUATION_HANDLER(c, &CacheVC::openReadStartHead);
c->vio.op = VIO::READ;
c->base_stat = cache_lookup_active_stat;
CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
c->first_key = c->key = *key;
- if (type == CACHE_FRAG_TYPE_HTTP)
- c->f.http_request = 1;
+ c->frag_type = type;
c->f.lookup = 1;
c->part = part;
c->last_collision = NULL;
@@ -2006,7 +2000,7 @@
if (_action.cancelled)
return free_CacheVC(this);
- MUTEX_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
+ CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
if (!lock)
VC_SCHED_LOCK_RETRY();
@@ -2077,15 +2071,19 @@
Lcollision:
// check for collision
if (dir_probe(&key, part, &dir, &last_collision) > 0) {
- return do_read(&key);
+ int ret = do_read_call(&key);
+ if (ret == EVENT_RETURN)
+ goto Lcallreturn;
+ return ret;
}
-
Ldone:
CACHE_INCREMENT_DYN_STAT(cache_remove_failure_stat);
if (od)
part->close_write(this);
_action.continuation->handleEvent(CACHE_EVENT_REMOVE_FAILED, (void *) -ECACHE_NO_DOC);
return free_CacheVC(this);
+Lcallreturn:
+ return handleEvent(AIO_EVENT_DONE, 0); // hopefully a tail call
}
Action *
@@ -2107,7 +2105,7 @@
if (!cont)
cont = new_CacheRemoveCont();
- MUTEX_TRY_LOCK(lock, cont->mutex, this_ethread());
+ CACHE_TRY_LOCK(lock, cont->mutex, this_ethread());
ink_assert(lock);
Part *part = key_to_part(key, hostname, host_len);
// coverity[var_decl]
@@ -2117,8 +2115,7 @@
CacheVC *c = new_CacheVC(cont);
c->vio.op = VIO::NONE;
- if (type == CACHE_FRAG_TYPE_HTTP)
- c->f.http_request = 1;
+ c->frag_type = type;
c->base_stat = cache_remove_active_stat;
CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
c->first_key = c->key = *key;
@@ -2269,12 +2266,10 @@
if (gdisks[i]->cleared) {
inku64 free_space = gdisks[i]->free_space * STORE_BLOCK_SIZE;
int parts = (free_space / MAX_PART_SIZE) + 1;
- int p = 0;
- for (; p < parts; p++) {
- unsigned int b = gdisks[i]->free_space / (parts - p);
+ for (int p = 0; p < parts; p++) {
+ ink_off_t b = gdisks[i]->free_space / (parts - p);
Debug("cache_hosting", "blocks = %d\n", b);
DiskPartBlock *dpb = gdisks[i]->create_partition(0, b, CACHE_HTTP_TYPE);
-
ink_assert(dpb && dpb->len == b);
}
ink_assert(gdisks[i]->free_space == 0);
@@ -2528,6 +2523,7 @@
Part *
Cache::key_to_part(CacheKey * key, char *hostname, int host_len)
{
+ inku32 h = (key->word(2) >> DIR_TAG_WIDTH) % PART_HASH_TABLE_SIZE;
unsigned short *hash_table = hosttable->gen_host_rec.part_hash_table;
CacheHostRecord *host_rec = &hosttable->gen_host_rec;
if (hosttable->m_numEntries > 0 && host_len) {
@@ -2539,7 +2535,7 @@
char format_str[50];
snprintf(format_str, sizeof(format_str), "Partition: %%xd for host: %%.%ds", host_len);
Debug("cache_hosting", format_str, res.record, hostname);
- return res.record->parts[host_hash_table[key->word(1) % PART_HASH_TABLE_SIZE]];
+ return res.record->parts[host_hash_table[h]];
}
}
}
@@ -2547,7 +2543,7 @@
char format_str[50];
snprintf(format_str, sizeof(format_str), "Generic partition: %%xd for host: %%.%ds", host_len);
Debug("cache_hosting", format_str, host_rec, hostname);
- return host_rec->parts[hash_table[key->word(1) % PART_HASH_TABLE_SIZE]];
+ return host_rec->parts[hash_table[h]];
} else
return host_rec->parts[0];
}
@@ -2839,10 +2835,6 @@
Debug("cache_init", "proxy.config.cache.max_doc_size = %d = %dMb",
cache_config_max_doc_size, cache_config_max_doc_size / (1024 * 1024));
- IOCORE_RegisterConfigInteger(RECT_CONFIG, "proxy.config.cache.max_agg_delay", 1000, RECU_DYNAMIC, RECC_NULL, NULL);
- IOCORE_EstablishStaticConfigInt32(cache_config_max_agg_delay, "proxy.config.cache.max_agg_delay");
- Debug("cache_init", "proxy.config.cache.max_agg_delay = %d", cache_config_max_agg_delay);
-
IOCORE_RegisterConfigString(RECT_CONFIG, "proxy.config.config_dir", SYSCONFDIR, RECU_DYNAMIC, RECC_NULL, NULL);
IOCORE_ReadConfigString(cache_system_config_directory, "proxy.config.config_dir", PATH_NAME_MAX);
Debug("cache_init", "proxy.config.config_dir = \"%s\"", cache_system_config_directory);
@@ -2869,10 +2861,6 @@
IOCORE_EstablishStaticConfigInt32(cache_config_max_disk_errors, "proxy.config.cache.max_disk_errors");
Debug("cache_init", "proxy.config.cache.max_disk_errors = %d", cache_config_max_disk_errors);
- IOCORE_RegisterConfigInteger(RECT_CONFIG, "proxy.config.cache.check_disk_idle", 1, RECU_DYNAMIC, RECC_NULL, NULL);
- IOCORE_EstablishStaticConfigInt32(cache_config_check_disk_idle, "proxy.config.cache.check_disk_idle");
- Debug("cache_init", "proxy.config.cache.check_disk_idle = %d", cache_config_check_disk_idle);
-
IOCORE_RegisterConfigInteger(RECT_CONFIG,
"proxy.config.cache.agg_write_backlog", 5242880, RECU_DYNAMIC, RECC_NULL, NULL);
IOCORE_EstablishStaticConfigInt32(cache_config_agg_write_backlog, "proxy.config.cache.agg_write_backlog");
@@ -2931,7 +2919,7 @@
CacheLookupHttpConfig * params, time_t pin_in_cache, CacheFragType type)
{
#ifdef CLUSTER_CACHE
- if (CacheClusteringEnabled > 0) {
+ if (cache_clustering_enabled > 0) {
return open_read_internal(CACHE_OPEN_READ_LONG, cont, (MIOBuffer *) 0,
url, request, params, (CacheKey *) 0, pin_in_cache, type, (char *) 0, 0);
}
@@ -2977,7 +2965,7 @@
CacheHTTPHdr * request, CacheHTTPInfo * old_info, time_t pin_in_cache, CacheFragType type)
{
#ifdef CLUSTER_CACHE
- if (CacheClusteringEnabled > 0) {
+ if (cache_clustering_enabled > 0) {
INK_MD5 url_md5;
Cache::generate_key(&url_md5, url, request);
ClusterMachine *m = cluster_machine_at_depth(cache_hash(url_md5));
@@ -3024,7 +3012,7 @@
CacheProcessor::remove(Continuation * cont, URL * url, CacheFragType frag_type)
{
#ifdef CLUSTER_CACHE
- if (CacheClusteringEnabled > 0) {
+ if (cache_clustering_enabled > 0) {
}
#endif
if (cache_global_hooks != NULL && cache_global_hooks->hooks_set > 0) {
Modified: incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheDir.cc
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheDir.cc?rev=892310&r1=892309&r2=892310&view=diff
==============================================================================
--- incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheDir.cc (original)
+++ incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheDir.cc Fri Dec 18 16:47:37 2009
@@ -30,9 +30,6 @@
#endif
#include "ink_stack_trace.h"
-#define SYNC_MAX_WRITE (256 * 1024)
-#define SYNC_DELAY HRTIME_MSECONDS(500)
-
#define CACHE_INC_DIR_USED(_m) do { \
ProxyMutex *mutex = _m; \
CACHE_INCREMENT_DYN_STAT(cache_direntries_used_stat); \
@@ -49,15 +46,10 @@
} while (0);
-// Debugging Options
-
-// #define CHECK_DIR_FAST
-// #define CHECK_DIR
-
// Globals
ClassAllocator<OpenDirEntry> openDirEntryAllocator("openDirEntry");
-Dir empty_dir = { 0 };
+Dir empty_dir;
// OpenDir
@@ -74,12 +66,12 @@
Returns 1 on success and 0 on failure.
*/
int
-OpenDir::open_write(CacheVC * cont, int allow_if_writers, int max_writers)
+OpenDir::open_write(CacheVC *cont, int allow_if_writers, int max_writers)
{
ink_debug_assert(cont->part->mutex->thread_holding == this_ethread());
unsigned int h = cont->first_key.word(0);
int b = h % OPEN_DIR_BUCKETS;
- for (OpenDirEntry * d = bucket[b].head; d; d = d->link.next) {
+ for (OpenDirEntry *d = bucket[b].head; d; d = d->link.next) {
if (!(d->writers.head->first_key == cont->first_key))
continue;
if (allow_if_writers && d->num_writers < d->max_writers) {
@@ -110,7 +102,7 @@
}
int
-OpenDir::signal_readers(int event, Event * e)
+OpenDir::signal_readers(int event, Event *e)
{
NOWARN_UNUSED(e);
NOWARN_UNUSED(event);
@@ -142,7 +134,7 @@
}
int
-OpenDir::close_write(CacheVC * cont)
+OpenDir::close_write(CacheVC *cont)
{
ink_debug_assert(cont->part->mutex->thread_holding == this_ethread());
cont->od->writers.remove(cont, cont->opendir_link);
@@ -161,18 +153,18 @@
}
OpenDirEntry *
-OpenDir::open_read(INK_MD5 * key)
+OpenDir::open_read(INK_MD5 *key)
{
unsigned int h = key->word(0);
int b = h % OPEN_DIR_BUCKETS;
- for (OpenDirEntry * d = bucket[b].head; d; d = d->link.next)
+ for (OpenDirEntry *d = bucket[b].head; d; d = d->link.next)
if (d->writers.head->first_key == *key)
return d;
return NULL;
}
int
-OpenDirEntry::wait(CacheVC * cont, int msec)
+OpenDirEntry::wait(CacheVC *cont, int msec)
{
ink_debug_assert(cont->part->mutex->thread_holding == this_ethread());
cont->f.open_read_timeout = 1;
@@ -189,7 +181,7 @@
// return value 1 means no loop
// zero indicates loop
int
-dir_bucket_loop_check(Dir * start_dir, Dir * seg)
+dir_bucket_loop_check(Dir *start_dir, Dir *seg)
{
if (start_dir == NULL)
return 1;
@@ -216,7 +208,7 @@
// adds all the directory entries
// in a segment to the segment freelist
void
-dir_init_segment(int s, Part * d)
+dir_init_segment(int s, Part *d)
{
d->header->freelist[s] = 0;
Dir *seg = dir_segment(s, d);
@@ -225,7 +217,7 @@
for (l = 1; l < DIR_DEPTH; l++) {
for (b = 0; b < d->buckets; b++) {
Dir *bucket = dir_bucket(b, seg);
- dir_free_entry(&bucket[l], s, d);
+ dir_free_entry(dir_bucket_row(bucket, l), s, d);
}
}
}
@@ -235,7 +227,7 @@
// Note : abuse of the token bit in dir entries
#if 0
int
-dir_bucket_loop_fix(Dir * start_dir, int s, Part * d)
+dir_bucket_loop_fix(Dir *start_dir, int s, Part *d)
{
int ret = 0;
if (start_dir == NULL)
@@ -257,13 +249,13 @@
}
p1 = p2;
}
- for (Dir * p3 = start_dir; p3; p3 = next_dir(p3, seg))
+ for (Dir *p3 = start_dir; p3; p3 = next_dir(p3, seg))
dir_set_token(p3, 0);
return ret;
}
#else
int
-dir_bucket_loop_fix(Dir * start_dir, int s, Part * d)
+dir_bucket_loop_fix(Dir *start_dir, int s, Part *d)
{
if (!dir_bucket_loop_check(start_dir, dir_segment(s, d))) {
Warning("Dir loop exists, clearing segment %d", s);
@@ -275,7 +267,7 @@
#endif
int
-dir_freelist_length(Part * d, int s)
+dir_freelist_length(Part *d, int s)
{
int free = 0;
Dir *seg = dir_segment(s, d);
@@ -290,7 +282,7 @@
}
int
-dir_bucket_length(Dir * b, int s, Part * d)
+dir_bucket_length(Dir *b, int s, Part *d)
{
Dir *e = b;
int i = 0;
@@ -306,26 +298,26 @@
return i;
}
-
-void
-check_dir(Part * d)
+int
+check_dir(Part *d)
{
int i, s;
Debug("cache_check_dir", "inside check dir");
- for (s = 0; s < DIR_SEGMENTS; s++) {
+ for (s = 0; s < d->segments; s++) {
Dir *seg = dir_segment(s, d);
ink_debug_assert(dir_bucket_loop_check(dir_from_offset(d->header->freelist[s], seg), seg));
for (i = 0; i < d->buckets; i++) {
- Dir RELEASE_UNUSED *b = dir_bucket(i, seg);
- ink_debug_assert(dir_bucket_length(b, s, d) >= 0);
- ink_debug_assert(!dir_next(b) || dir_offset(b));
- ink_debug_assert(dir_bucket_loop_check(b, seg));
+ Dir *b = dir_bucket(i, seg);
+ if (!(dir_bucket_length(b, s, d) >= 0)) return 0;
+ if (!(!dir_next(b) || dir_offset(b))) return 0;
+ if (!(dir_bucket_loop_check(b, seg))) return 0;
}
}
+ return 1;
}
inline void
-unlink_from_freelist(Dir * e, int s, Part * d)
+unlink_from_freelist(Dir *e, int s, Part *d)
{
Dir *seg = dir_segment(s, d);
Dir *p = dir_from_offset(dir_prev(e), seg);
@@ -339,7 +331,7 @@
}
inline Dir *
-dir_delete_entry(Dir * e, Dir * p, int s, Part * d)
+dir_delete_entry(Dir *e, Dir *p, int s, Part *d)
{
Dir *seg = dir_segment(s, d);
int no = dir_next(e);
@@ -368,7 +360,7 @@
}
inline void
-dir_clean_bucket(Dir * b, int s, Part * part)
+dir_clean_bucket(Dir *b, int s, Part *part)
{
Dir *e = b, *p = NULL;
Dir *seg = dir_segment(s, part);
@@ -387,7 +379,6 @@
(long) e, dir_tag(e), (int) dir_offset(e), (long) b, (long) p, dir_bucket_length(b, s, part));
if (dir_offset(e))
CACHE_DEC_DIR_USED(part->mutex);
-
e = dir_delete_entry(e, p, s, part);
continue;
}
@@ -397,7 +388,7 @@
}
void
-dir_clean_segment(int s, Part * d)
+dir_clean_segment(int s, Part *d)
{
Dir *seg = dir_segment(s, d);
for (int i = 0; i < d->buckets; i++) {
@@ -407,23 +398,20 @@
}
void
-dir_clean_part(Part * d)
+dir_clean_part(Part *d)
{
- for (int i = 0; i < DIR_SEGMENTS; i++)
+ for (int i = 0; i < d->segments; i++)
dir_clean_segment(i, d);
-#if defined(DEBUG) && defined(CHECK_DIR)
- check_dir(d);
-#endif
+ CHECK_DIR(d);
}
void
-dir_clear_range(int start, int end, Part * part)
+dir_clear_range(ink_off_t start, ink_off_t end, Part *part)
{
- for (int i = 0; i < part->buckets * DIR_DEPTH * DIR_SEGMENTS; i++) {
+ for (int i = 0; i < part->buckets * DIR_DEPTH * part->segments; i++) {
Dir *e = dir_index(part, i);
if (!dir_token(e) && (int) dir_offset(e) >= start && (int) dir_offset(e) < end) {
CACHE_DEC_DIR_USED(part->mutex);
-
dir_set_offset(e, 0); // delete
}
}
@@ -431,7 +419,7 @@
}
void
-check_bucket_not_contains(Dir * b, Dir * e, Dir * seg)
+check_bucket_not_contains(Dir *b, Dir *e, Dir *seg)
{
Dir *x = b;
do {
@@ -443,7 +431,7 @@
}
void
-freelist_clean(int s, Part * part)
+freelist_clean(int s, Part *part)
{
dir_clean_segment(s, part);
if (part->header->freelist[s])
@@ -457,7 +445,6 @@
Dir *e = dir_bucket_row(b, l);
if (dir_head(e) && !(n++ % 10)) {
CACHE_DEC_DIR_USED(part->mutex);
-
dir_set_offset(e, 0); // delete
}
}
@@ -466,7 +453,7 @@
}
inline Dir *
-freelist_pop(int s, Part * d)
+freelist_pop(int s, Part *d)
{
Dir *seg = dir_segment(s, d);
Dir *e = dir_from_offset(d->header->freelist[s], seg);
@@ -487,7 +474,7 @@
}
int
-dir_segment_accounted(int s, Part * d, int offby, int *f, int *u, int *et, int *v, int *av, int *as)
+dir_segment_accounted(int s, Part *d, int offby, int *f, int *u, int *et, int *v, int *av, int *as)
{
int free = dir_freelist_length(d, s);
int used = 0, empty = 0;
@@ -531,7 +518,7 @@
}
void
-dir_free_entry(Dir * e, int s, Part * d)
+dir_free_entry(Dir *e, int s, Part *d)
{
Dir *seg = dir_segment(s, d);
unsigned int fo = d->header->freelist[s];
@@ -543,17 +530,15 @@
}
int
-dir_probe(CacheKey * key, Part * d, Dir * result, Dir ** last_collision)
+dir_probe(CacheKey *key, Part *d, Dir *result, Dir ** last_collision)
{
ink_debug_assert(d->mutex->thread_holding == this_ethread());
- int s = key->word(0) % DIR_SEGMENTS;
- int b = (key->word(0) / DIR_SEGMENTS) % d->buckets;
+ int s = key->word(0) % d->segments;
+ int b = key->word(1) % d->buckets;
Dir *seg = dir_segment(s, d);
Dir *e = NULL, *p = NULL, *collision = *last_collision;
Part *part = d;
-#if defined(DEBUG) && defined(CHECK_DIR)
- check_dir(d);
-#endif
+ CHECK_DIR(d);
#ifdef LOOP_CHECK_MODE
if (dir_bucket_loop_fix(dir_bucket(b, seg), s, d))
return 0;
@@ -581,7 +566,7 @@
goto Lcont;
}
if (dir_valid(d, e)) {
- Debug("dir_probe_hit", "found %X part %d bucket %d boffset %d", key->word(0), d->fd, b, (int) dir_offset(e));
+ Debug("dir_probe_hit", "found %X %X part %d bucket %d boffset %d", key->word(0), key->word(1), d->fd, b, (int) dir_offset(e));
dir_assign(result, e);
*last_collision = e;
ink_assert(dir_offset(e) * INK_BLOCK_SIZE < d->len);
@@ -592,7 +577,7 @@
continue;
}
} else
- Debug("dir_probe_tag", "tag mismatch %X %X vs expected %X", e, dir_tag(e), key->word(1));
+ Debug("dir_probe_tag", "tag mismatch %X %X vs expected %X", e, dir_tag(e), key->word(3));
Lcont:
p = e;
e = next_dir(e, seg);
@@ -603,35 +588,31 @@
collision = NULL;
goto Lagain;
}
- Debug("dir_probe_miss", "missed %X on part %d bucket %d at %X", key->word(0), d->fd, b, (long) seg);
-#if defined(DEBUG) && defined(CHECK_DIR)
- check_dir(d);
-#endif
+ Debug("dir_probe_miss", "missed %X %X on part %d bucket %d at %X", key->word(0), key->word(1), d->fd, b, (long) seg);
+ CHECK_DIR(d);
return 0;
}
int
-dir_insert(CacheKey * key, Part * d, Dir * to_part)
+dir_insert(CacheKey *key, Part *d, Dir *to_part)
{
ink_debug_assert(d->mutex->thread_holding == this_ethread());
- int s = key->word(0) % DIR_SEGMENTS, l;
- int bi = (key->word(0) / DIR_SEGMENTS) % d->buckets;
- ink_assert((unsigned int) dir_approx_size(to_part) <= (unsigned int) (MAX_FRAG_SIZE + sizeofDoc)); // XXX - size should be unsigned
+ int s = key->word(0) % d->segments, l;
+ int bi = key->word(1) % d->buckets;
+ ink_assert(dir_approx_size(to_part) <= MAX_FRAG_SIZE + sizeofDoc);
Dir *seg = dir_segment(s, d);
Dir *e = NULL;
Dir *b = dir_bucket(bi, seg);
Part *part = d;
-#if defined(DEBUG) && defined(CHECK_DIR_FAST)
- unsigned int t = DIR_MASK_TAG(key->word(1));
+#if defined(DEBUG) && defined(DO_CHECK_DIR_FAST)
+ unsigned int t = DIR_MASK_TAG(key->word(2));
Dir *col = b;
while (col) {
ink_assert((dir_tag(col) != t) || (dir_offset(col) != dir_offset(to_part)));
col = next_dir(col, seg);
}
#endif
-#if defined(DEBUG) && defined(CHECK_DIR)
- check_dir(d);
-#endif
+ CHECK_DIR(d);
Lagain:
// get from this row first
@@ -654,36 +635,32 @@
dir_set_next(b, dir_to_offset(e, seg));
Lfill:
dir_assign_data(e, to_part);
- dir_set_tag(e, key->word(1));
+ dir_set_tag(e, key->word(2));
ink_assert(part_offset(d, e) < (d->skip + d->len));
Debug("dir_insert",
"insert %X %X into part %d bucket %d at %X tag %X %X boffset %d",
(long) e, key->word(0), d->fd, bi, (long) e, key->word(1), dir_tag(e), (int) dir_offset(e));
-#if defined(DEBUG) && defined(CHECK_DIR)
- check_dir(d);
-#endif
+ CHECK_DIR(d);
d->header->dirty = 1;
CACHE_INC_DIR_USED(d->mutex);
return 1;
}
int
-dir_overwrite(CacheKey * key, Part * d, Dir * dir, Dir * overwrite, bool must_overwrite)
+dir_overwrite(CacheKey *key, Part *d, Dir *dir, Dir *overwrite, bool must_overwrite)
{
ink_debug_assert(d->mutex->thread_holding == this_ethread());
- int s = key->word(0) % DIR_SEGMENTS, l;
- int bi = (key->word(0) / DIR_SEGMENTS) % d->buckets;
+ int s = key->word(0) % d->segments, l;
+ int bi = key->word(1) % d->buckets;
Dir *seg = dir_segment(s, d);
Dir *e = NULL;
Dir *b = dir_bucket(bi, seg);
- unsigned int t = DIR_MASK_TAG(key->word(1));
+ unsigned int t = DIR_MASK_TAG(key->word(2));
int res = 1;
int loop_count = 0;
bool loop_possible = true;
Part *part = d;
-#if defined(DEBUG) && defined(CHECK_DIR)
- check_dir(d);
-#endif
+ CHECK_DIR(d);
ink_assert((unsigned int) dir_approx_size(dir) <= (unsigned int) (MAX_FRAG_SIZE + sizeofDoc)); // XXX - size should be unsigned
Lagain:
@@ -735,26 +712,22 @@
Debug("dir_overwrite",
"overwrite %X %X into part %d bucket %d at %X tag %X %X boffset %d",
(long) e, key->word(0), d->fd, bi, (long) e, t, dir_tag(e), (int) dir_offset(e));
-#if defined(DEBUG) && defined(CHECK_DIR)
- check_dir(d);
-#endif
+ CHECK_DIR(d);
d->header->dirty = 1;
return res;
}
int
-dir_delete(CacheKey * key, Part * d, Dir * del)
+dir_delete(CacheKey *key, Part *d, Dir *del)
{
ink_debug_assert(d->mutex->thread_holding == this_ethread());
- int s = key->word(0) % DIR_SEGMENTS;
- int b = (key->word(0) / DIR_SEGMENTS) % d->buckets;
+ int s = key->word(0) % d->segments;
+ int b = key->word(1) % d->buckets;
Dir *seg = dir_segment(s, d);
Dir *e = NULL, *p = NULL;
int loop_count = 0;
Part *part = d;
-#if defined(DEBUG) && defined(CHECK_DIR)
- check_dir(d);
-#endif
+ CHECK_DIR(d);
e = dir_bucket(b, seg);
if (dir_offset(e))
@@ -769,24 +742,20 @@
if (dir_compare_tag(e, key) && dir_offset(e) == dir_offset(del)) {
CACHE_DEC_DIR_USED(d->mutex);
dir_delete_entry(e, p, s, d);
-#if defined(DEBUG) && defined(CHECK_DIR)
- check_dir(d);
-#endif
+ CHECK_DIR(d);
return 1;
}
p = e;
e = next_dir(e, seg);
} while (e);
-#if defined(DEBUG) && defined(CHECK_DIR)
- check_dir(d);
-#endif
+ CHECK_DIR(d);
return 0;
}
// Lookaside Cache
int
-dir_lookaside_probe(CacheKey * key, Part * d, Dir * result, EvacuationBlock ** eblock)
+dir_lookaside_probe(CacheKey *key, Part *d, Dir *result, EvacuationBlock ** eblock)
{
ink_debug_assert(d->mutex->thread_holding == this_ethread());
int i = key->word(3) % LOOKASIDE_SIZE;
@@ -808,10 +777,10 @@
}
int
-dir_lookaside_insert(EvacuationBlock * eblock, Part * d, Dir * to)
+dir_lookaside_insert(EvacuationBlock *eblock, Part *d, Dir *to)
{
CacheKey *key = &eblock->evac_frags.earliest_key;
- Debug("dir_lookaside", "insert %X, offset %d phase %d", key->word(0), (int) dir_offset(to), (int) dir_phase(to));
+ Debug("dir_lookaside", "insert %X %X, offset %d phase %d", key->word(0), key->word(1), (int) dir_offset(to), (int) dir_phase(to));
ink_debug_assert(d->mutex->thread_holding == this_ethread());
int i = key->word(3) % LOOKASIDE_SIZE;
EvacuationBlock *b = new_EvacuationBlock(d->mutex->thread_holding);
@@ -826,7 +795,7 @@
}
int
-dir_lookaside_fixup(CacheKey * key, Part * d)
+dir_lookaside_fixup(CacheKey *key, Part *d)
{
ink_debug_assert(d->mutex->thread_holding == this_ethread());
int i = key->word(3) % LOOKASIDE_SIZE;
@@ -834,16 +803,16 @@
while (b) {
if (b->evac_frags.key == *key) {
int res = dir_overwrite(key, d, &b->new_dir, &b->dir, false);
- Debug("dir_lookaside", "fixup %X offset %d phase %d %d",
- key->word(0), dir_offset(&b->new_dir), dir_phase(&b->new_dir), res);
+ Debug("dir_lookaside", "fixup %X %X offset %d phase %d %d",
+ key->word(0), key->word(1), dir_offset(&b->new_dir), dir_phase(&b->new_dir), res);
d->ram_cache.fixup(key, 0, dir_offset(&b->dir), 0, dir_offset(&b->new_dir));
d->lookaside[i].remove(b);
#if 0
// we need to do this because in case of a small cache, the scan
// might have occured before we inserted this directory entry (if we
// wrapped around fast enough)
- int part_end_offset = offset_to_part_offset(d, d->len + d->skip);
- int part_write_offset = offset_to_part_offset(d, d->header->write_pos);
+ ink_off_t part_end_offset = offset_to_part_offset(d, d->len + d->skip);
+ ink_off_t part_write_offset = offset_to_part_offset(d, d->header->write_pos);
if ((dir_offset(&b->new_dir) + part_end_offset - part_write_offset)
% part_end_offset <= offset_to_part_offset(d, EVAC_SIZE + (d->len / PIN_SCAN_EVERY)))
d->force_evacuate_head(&b->new_dir, dir_pinned(&b->new_dir));
@@ -853,12 +822,12 @@
}
b = b->link.next;
}
- Debug("dir_lookaside", "fixup %X failed", key->word(0));
+ Debug("dir_lookaside", "fixup %X %X failed", key->word(0), key->word(1));
return 0;
}
void
-dir_lookaside_cleanup(Part * d)
+dir_lookaside_cleanup(Part *d)
{
ink_debug_assert(d->mutex->thread_holding == this_ethread());
for (int i = 0; i < LOOKASIDE_SIZE; i++) {
@@ -866,7 +835,8 @@
while (b) {
if (!dir_valid(d, &b->new_dir)) {
EvacuationBlock *nb = b->link.next;
- Debug("dir_lookaside", "cleanup %X cleaned up", b->evac_frags.earliest_key.word(0));
+ Debug("dir_lookaside", "cleanup %X %X cleaned up",
+ b->evac_frags.earliest_key.word(0), b->evac_frags.earliest_key.word(1));
d->lookaside[i].remove(b);
free_CacheVC(b->earliest_evacuator);
free_EvacuationBlock(b, d->mutex->thread_holding);
@@ -880,22 +850,22 @@
}
void
-dir_lookaside_remove(CacheKey * key, Part * d)
+dir_lookaside_remove(CacheKey *key, Part *d)
{
ink_debug_assert(d->mutex->thread_holding == this_ethread());
int i = key->word(3) % LOOKASIDE_SIZE;
EvacuationBlock *b = d->lookaside[i].head;
while (b) {
if (b->evac_frags.key == *key) {
- Debug("dir_lookaside", "remove %X offset %d phase %d",
- key->word(0), dir_offset(&b->new_dir), dir_phase(&b->new_dir));
+ Debug("dir_lookaside", "remove %X %X offset %d phase %d",
+ key->word(0), key->word(1), dir_offset(&b->new_dir), dir_phase(&b->new_dir));
d->lookaside[i].remove(b);
free_EvacuationBlock(b, d->mutex->thread_holding);
return;
}
b = b->link.next;
}
- Debug("dir_lookaside", "remove %X failed", key->word(0));
+ Debug("dir_lookaside", "remove %X %X failed", key->word(0), key->word(1));
return;
}
@@ -922,12 +892,12 @@
}
inku64
-dir_entries_used(Part * d)
+dir_entries_used(Part *d)
{
inku64 full = 0;
inku64 sfull = 0;
- for (int s = 0; s < DIR_SEGMENTS; full += sfull, s++) {
+ for (int s = 0; s < d->segments; full += sfull, s++) {
Dir *seg = dir_segment(s, d);
sfull = 0;
for (int b = 0; b < d->buckets; b++) {
@@ -1022,13 +992,10 @@
Debug("cache_dir_sync", "Periodic dir sync in progress -- overwriting");
}
d->footer->sync_serial = d->header->sync_serial;
-#ifdef DEBUG
- check_dir(d);
-#endif
+ CHECK_DIR(d);
memcpy(buf, d->raw_dir, dirlen);
int B = d->header->sync_serial & 1;
ink_off_t start = d->skip + (B ? dirlen : 0);
-
B = pwrite(d->fd, buf, dirlen, start);
ink_debug_assert(B == dirlen);
Debug("cache_dir_sync", "done syncing dir for part %s", d->hash_id);
@@ -1041,7 +1008,7 @@
int
-CacheSync::mainEvent(int event, Event * e)
+CacheSync::mainEvent(int event, Event *e)
{
NOWARN_UNUSED(e);
NOWARN_UNUSED(event);
@@ -1121,9 +1088,7 @@
}
d->header->sync_serial++;
d->footer->sync_serial = d->header->sync_serial;
-#ifdef DEBUG
- check_dir(d);
-#endif
+ CHECK_DIR(d);
memcpy(buf, d->raw_dir, dirlen);
d->dir_sync_in_progress = 1;
}
@@ -1169,11 +1134,12 @@
{
NOWARN_UNUSED(fix);
int hist[HIST_DEPTH + 1] = { 0 };
- int shist[DIR_SEGMENTS] = { 0 };
+ int *shist = (int*)xmalloc(segments * sizeof(int));
+ memset(shist, 0, segments * sizeof(int));
int j;
int stale = 0, full = 0, empty = 0;
int last = 0, free = 0;
- for (int s = 0; s < DIR_SEGMENTS; s++) {
+ for (int s = 0; s < segments; s++) {
Dir *seg = dir_segment(s, this);
for (int b = 0; b < buckets; b++) {
int h = 0;
@@ -1201,11 +1167,11 @@
last = t;
free += dir_freelist_length(this, s);
}
- int total = buckets * DIR_SEGMENTS * DIR_DEPTH;
+ int total = buckets * segments * DIR_DEPTH;
printf(" Directory for [%s]\n", hash_id);
printf(" Bytes: %d\n", total * SIZEOF_DIR);
- printf(" Segments: %d\n", DIR_SEGMENTS);
- printf(" Buckets: %d\n", buckets);
+ printf(" Segments: %lld\n", (inku64)segments);
+ printf(" Buckets: %lld\n", (inku64)buckets);
printf(" Entries: %d\n", total);
printf(" Full: %d\n", full);
printf(" Empty: %d\n", empty);
@@ -1213,25 +1179,26 @@
printf(" Free: %d\n", free);
printf(" Bucket Fullness: ");
for (j = 0; j < HIST_DEPTH; j++) {
- printf("%5d ", hist[j]);
- if ((j % 5 == 4))
+ printf("%8d ", hist[j]);
+ if ((j % 4 == 3))
printf("\n" " ");
}
printf("\n");
printf(" Segment Fullness: ");
- for (j = 0; j < DIR_SEGMENTS; j++) {
+ for (j = 0; j < segments; j++) {
printf("%5d ", shist[j]);
if ((j % 5 == 4))
printf("\n" " ");
}
printf("\n");
printf(" Freelist Fullness: ");
- for (j = 0; j < DIR_SEGMENTS; j++) {
+ for (j = 0; j < segments; j++) {
printf("%5d ", dir_freelist_length(this, j));
if ((j % 5 == 4))
printf("\n" " ");
}
printf("\n");
+ ::xfree(shist);
return 0;
}
@@ -1322,7 +1289,7 @@
}
void
-regress_rand_CacheKey(CacheKey * key)
+regress_rand_CacheKey(CacheKey *key)
{
unsigned int *x = (unsigned int *) key;
for (int i = 0; i < 4; i++)
@@ -1330,7 +1297,7 @@
}
void
-dir_corrupt_bucket(Dir * b, int s, Part * d)
+dir_corrupt_bucket(Dir *b, int s, Part *d)
{
// coverity[secure_coding]
int l = ((int) (dir_bucket_length(b, s, d) * drand48()));
@@ -1343,29 +1310,7 @@
dir_next(e) = dir_to_offset(e, seg);
}
-struct CacheDirReg:Continuation
-{
- int *status;
-
- CacheDirReg(int *_status):status(_status)
- {
- SET_HANDLER(&CacheDirReg::signal_reg);
- eventProcessor.schedule_in(this, 120 * HRTIME_SECOND);
-
- }
-
- int signal_reg(int event, Event * e)
- {
- NOWARN_UNUSED(e);
- NOWARN_UNUSED(event);
-
- *status = REGRESSION_TEST_PASSED;
- return EVENT_DONE;
- }
-
-};
-
-EXCLUSIVE_REGRESSION_TEST(Cache_dir) (RegressionTest * t, int atype, int *status) {
+EXCLUSIVE_REGRESSION_TEST(Cache_dir) (RegressionTest *t, int atype, int *status) {
NOWARN_UNUSED(atype);
ink_hrtime ttime;
int ret = REGRESSION_TEST_PASSED;
@@ -1394,7 +1339,7 @@
CacheKey key;
rand_CacheKey(&key, thread->mutex);
- int s = key.word(0) % DIR_SEGMENTS, i, j;
+ int s = key.word(0) % d->segments, i, j;
Dir *seg = dir_segment(s, d);
// test insert
@@ -1457,27 +1402,15 @@
}
- /* introduce loops */
- /* in bucket */
-#if 0
- for (int sno = 0; sno < DIR_SEGMENTS; sno++) {
- for (int bno = 0; bno < d->buckets; bno++) {
- // coverity[secure_coding]
- if (drand48() < 0.01)
- rprintf(t, "creating loop in bucket %d, seg %d\n", bno, sno);
- dir_corrupt_bucket(dir_bucket(bno, dir_segment(sno, d)), sno, d);
- }
- }
-
-#else
Dir *last_collision = 0, *seg1 = 0;
int s1, b1;
- for (int ntimes = 0; ntimes < 1000; ntimes++) {
- rprintf(t, "dir_probe in bucket with loop\n");
+ rprintf(t, "corrupt_bucket test\n");
+ for (int ntimes = 0; ntimes < 10; ntimes++) {
+ // dir_probe in bucket with loop
rand_CacheKey(&key, thread->mutex);
- s1 = key.word(0) % DIR_SEGMENTS;
- b1 = (key.word(0) / DIR_SEGMENTS) % d->buckets;
+ s1 = key.word(0) % d->segments;
+ b1 = key.word(1) % d->buckets;
dir_corrupt_bucket(dir_bucket(b1, dir_segment(s1, d)), s1, d);
dir_insert(&key, d, &dir);
last_collision = 0;
@@ -1485,17 +1418,17 @@
rand_CacheKey(&key, thread->mutex);
- s1 = key.word(0) % DIR_SEGMENTS;
- b1 = (key.word(0) / DIR_SEGMENTS) % d->buckets;
+ s1 = key.word(0) % d->segments;
+ b1 = key.word(1) % d->buckets;
dir_corrupt_bucket(dir_bucket(b1, dir_segment(s1, d)), s1, d);
last_collision = 0;
dir_probe(&key, d, &dir, &last_collision);
- rprintf(t, "dir_overwrite in bucket with loop\n");
+ // dir_overwrite in bucket with loop
rand_CacheKey(&key, thread->mutex);
- s1 = key.word(0) % DIR_SEGMENTS;
- b1 = (key.word(0) / DIR_SEGMENTS) % d->buckets;
+ s1 = key.word(0) % d->segments;
+ b1 = key.word(1) % d->buckets;
CacheKey key1;
key1.b[1] = 127;
Dir dir1 = dir;
@@ -1508,43 +1441,38 @@
dir_overwrite(&key, d, &dir, &dir, 1);
rand_CacheKey(&key, thread->mutex);
- s1 = key.word(0) % DIR_SEGMENTS;
- b1 = (key.word(0) / DIR_SEGMENTS) % d->buckets;
+ s1 = key.word(0) % d->segments;
+ b1 = key.word(1) % d->buckets;
key.b[1] = 23;
dir_insert(&key, d, &dir1);
dir_corrupt_bucket(dir_bucket(b1, dir_segment(s1, d)), s1, d);
dir_overwrite(&key, d, &dir, &dir, 0);
-
rand_CacheKey(&key, thread->mutex);
- s1 = key.word(0) % DIR_SEGMENTS;
+ s1 = key.word(0) % d->segments;
seg1 = dir_segment(s1, d);
- rprintf(t, "dir_freelist_length in freelist with loop: segment %d\n", s1);
+ // dir_freelist_length in freelist with loop
dir_corrupt_bucket(dir_from_offset(d->header->freelist[s], seg1), s1, d);
dir_freelist_length(d, s1);
-
-
rand_CacheKey(&key, thread->mutex);
- s1 = key.word(0) % DIR_SEGMENTS;
- b1 = (key.word(0) / DIR_SEGMENTS) % d->buckets;
- rprintf(t, "dir_bucket_length in bucket with loop: segment %d\n", s1);
+ s1 = key.word(0) % d->segments;
+ b1 = key.word(1) % d->buckets;
+ // dir_bucket_length in bucket with loop
dir_corrupt_bucket(dir_bucket(b1, dir_segment(s1, d)), s1, d);
dir_bucket_length(dir_bucket(b1, dir_segment(s1, d)), s1, d);
-
-
+ // this assert by design
rand_CacheKey(&key, thread->mutex);
- s1 = key.word(0) % DIR_SEGMENTS;
- b1 = (key.word(0) / DIR_SEGMENTS) % d->buckets;
- rprintf(t, "check_dir in bucket with loop: segment %d %d %d\n", 3, 23, 17);
+ s1 = key.word(0) % d->segments;
+ b1 = key.word(1) % d->buckets;
+
dir_corrupt_bucket(dir_bucket(b1, dir_segment(3, d)), 3, d);
dir_corrupt_bucket(dir_bucket(b1, dir_segment(7, d)), 7, d);
dir_corrupt_bucket(dir_bucket(b1, dir_segment(17, d)), 17, d);
- check_dir(d);
+ if (check_dir(d))
+ ret = REGRESSION_TEST_FAILED;
}
part_dir_clear(d);
-#endif
-
- NEW(new CacheDirReg(status));
+ *status = ret;
}
Modified: incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheDisk.cc
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheDisk.cc?rev=892310&r1=892309&r2=892310&view=diff
==============================================================================
--- incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheDisk.cc (original)
+++ incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheDisk.cc Fri Dec 18 16:47:37 2009
@@ -205,7 +205,7 @@
/* size is in store blocks */
DiskPartBlock *
-CacheDisk::create_partition(int number, int size_in_blocks, int scheme)
+CacheDisk::create_partition(int number, ink_off_t size_in_blocks, int scheme)
{
if (size_in_blocks == 0)
@@ -216,7 +216,7 @@
if (!q)
return NULL;
- int max_blocks = MAX_PART_SIZE >> STORE_BLOCK_SHIFT;
+ ink_off_t max_blocks = MAX_PART_SIZE >> STORE_BLOCK_SHIFT;
size_in_blocks = (size_in_blocks <= max_blocks) ? size_in_blocks : max_blocks;
int blocks_per_part = PART_BLOCK_SIZE / STORE_BLOCK_SIZE;
Modified: incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheHttp.cc
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheHttp.cc?rev=892310&r1=892309&r2=892310&view=diff
==============================================================================
--- incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheHttp.cc (original)
+++ incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheHttp.cc Fri Dec 18 16:47:37 2009
@@ -228,7 +228,7 @@
/*-------------------------------------------------------------------------
-------------------------------------------------------------------------*/
-int
+inku32
CacheHTTPInfoVector::get_handles(const char *buf, int length, RefCountObj * block_ptr)
{
ink_assert(!(((long) buf) & 3)); // buf must be aligned
Modified: incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheLink.cc
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheLink.cc?rev=892310&r1=892309&r2=892310&view=diff
==============================================================================
--- incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheLink.cc (original)
+++ incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheLink.cc Fri Dec 18 16:47:37 2009
@@ -84,35 +84,41 @@
ink_assert(caches[type] == this);
- Action *action = NULL;
-
Part *part = key_to_part(key, hostname, host_len);
- Dir result = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
+ Dir result;
Dir *last_collision = NULL;
+ CacheVC *c = NULL;
+ {
+ MUTEX_TRY_LOCK(lock, part->mutex, cont->mutex->thread_holding);
+ if (lock) {
+ if (!dir_probe(key, part, &result, &last_collision)) {
+ cont->handleEvent(CACHE_EVENT_DEREF_FAILED, (void *) -ECACHE_NO_DOC);
+ return ACTION_RESULT_DONE;
+ }
+ }
+ c = new_CacheVC(cont);
+ SET_CONTINUATION_HANDLER(c, &CacheVC::derefRead);
+ c->first_key = c->key = *key;
+ c->part = part;
+ c->dir = result;
+ c->last_collision = last_collision;
- MUTEX_TRY_LOCK(lock, part->mutex, cont->mutex->thread_holding);
- if (lock) {
- if (!dir_probe(key, part, &result, &last_collision)) {
- cont->handleEvent(CACHE_EVENT_DEREF_FAILED, (void *) -ECACHE_NO_DOC);
- return ACTION_RESULT_DONE;
+ if (!lock) {
+ c->mutex->thread_holding->schedule_in_local(c, MUTEX_RETRY_DELAY);
+ return &c->_action;
}
- }
- CacheVC *c = new_CacheVC(cont);
- SET_CONTINUATION_HANDLER(c, &CacheVC::derefRead);
- c->first_key = c->key = *key;
- c->part = part;
- c->dir = result;
- c->last_collision = last_collision;
- if (!lock) {
- c->mutex->thread_holding->schedule_in_local(c, MUTEX_RETRY_DELAY);
- return &c->_action;
+ switch (c->do_read_call(&c->key)) {
+ case EVENT_DONE: return ACTION_RESULT_DONE;
+ case EVENT_RETURN: goto Lcallreturn;
+ default: return &c->_action;
+ }
}
-
- if (c->do_read(&c->key) == EVENT_CONT)
- return &c->_action;
+Lcallreturn:
+ if (c->handleEvent(AIO_EVENT_DONE, 0) == EVENT_DONE)
+ return ACTION_RESULT_DONE;
else
- return action;
+ return &c->_action;
}
int
@@ -149,10 +155,16 @@
mutex->thread_holding->schedule_in_local(this, MUTEX_RETRY_DELAY);
return EVENT_CONT;
}
- if (dir_probe(&key, part, &dir, &last_collision))
- return do_read(&key);
+ if (dir_probe(&key, part, &dir, &last_collision)) {
+ int ret = do_read_call(&first_key);
+ if (ret == EVENT_RETURN)
+ goto Lcallreturn;
+ return ret;
+ }
}
Ldone:
_action.continuation->handleEvent(CACHE_EVENT_DEREF_FAILED, (void *) -ECACHE_NO_DOC);
return free_CacheVC(this);
+Lcallreturn:
+ return handleEvent(AIO_EVENT_DONE, 0); // hopefully a tail call
}
Modified: incubator/trafficserver/traffic/branches/dev/iocore/cache/CachePages.cc
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/branches/dev/iocore/cache/CachePages.cc?rev=892310&r1=892309&r2=892310&view=diff
==============================================================================
--- incubator/trafficserver/traffic/branches/dev/iocore/cache/CachePages.cc (original)
+++ incubator/trafficserver/traffic/branches/dev/iocore/cache/CachePages.cc Fri Dec 18 16:47:37 2009
@@ -305,6 +305,8 @@
CHECK_SHOW(show("<tr><td>sync_serial</td><td>%lu</tr>\n", d->sync_serial));
CHECK_SHOW(show("<tr><td>write_serial</td><td>%lu</tr>\n", d->write_serial));
CHECK_SHOW(show("<tr><td>header length</td><td>%lu</tr>\n", d->hlen));
+ CHECK_SHOW(show("<tr><td>fragment type</td><td>%lu</tr>\n", d->ftype));
+ CHECK_SHOW(show("<tr><td>fragment table length</td><td>%lu</tr>\n", d->flen));
CHECK_SHOW(show("<tr><td>No of Alternates</td><td>%d</td></tr>\n", alt_count));
CHECK_SHOW(show("<tr><td>Action</td>\n"
Modified: incubator/trafficserver/traffic/branches/dev/iocore/cache/CachePagesInternal.cc
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/branches/dev/iocore/cache/CachePagesInternal.cc?rev=892310&r1=892309&r2=892310&view=diff
==============================================================================
--- incubator/trafficserver/traffic/branches/dev/iocore/cache/CachePagesInternal.cc (original)
+++ incubator/trafficserver/traffic/branches/dev/iocore/cache/CachePagesInternal.cc Fri Dec 18 16:47:37 2009
@@ -153,23 +153,15 @@
sprintf(nbytes, "%d", vc->vio.nbytes);
sprintf(todo, "%d", vc->vio.ntodo());
-#if 1
- if (vc->f.http_request && vc->request.valid()) {
+ if (vc->f.frag_type == CACHE_FRAG_TYPE_HTTP && vc->request.valid()) {
URL *u = vc->request.url_get(&uu);
u->print(url, 8000, &ib, &xd);
url[ib] = 0;
- }
- //else if (vc->vector.get(vc->alternate_index)->valid()) {
- // URL* u = vc->vector.get(vc->alternate_index)->request_url_get(&uu);
- // u->print(url, 8000, &ib, &xd);
- // url[ib] = 0;
- //}
- else if (vc->alternate.valid()) {
+ } else if (vc->alternate.valid()) {
URL *u = vc->alternate.request_url_get(&uu);
u->print(url, 8000, &ib, &xd);
url[ib] = 0;
} else
-#endif
vc->key.string(url);
CHECK_SHOW(show("<tr>" "<td>%s</td>" // operation
"<td>%s</td>" // Part
@@ -279,7 +271,7 @@
agg_todo++;
CHECK_SHOW(show("<tr>" "<td>%s</td>" // ID
"<td>%d</td>" // blocks
- "<td>%d</td>" // directory entries
+ "<td>%lld</td>" // directory entries
"<td>%d</td>" // write position
"<td>%d</td>" // write agg to do
"<td>%d</td>" // write agg to do size
@@ -291,7 +283,7 @@
"</tr>\n",
p->hash_id,
(int) ((p->len - (p->start - p->skip)) / INK_BLOCK_SIZE),
- p->buckets * DIR_DEPTH * DIR_SEGMENTS,
+ (inku64)(p->buckets * DIR_DEPTH * p->segments),
(int) ((p->header->write_pos - p->start) / INK_BLOCK_SIZE),
agg_todo,
p->agg_todo_size,
@@ -330,7 +322,7 @@
"<td>%d</td>"
"<td>%d</td>" "<td>%d</td>" "<td>%d</td>" "</tr>\n", free, used, empty, valid, agg_valid, avg_size));
seg_index++;
- if (seg_index < DIR_SEGMENTS)
+ if (seg_index < p->segments)
CONT_SCHED_LOCK_RETRY(this);
else {
CHECK_SHOW(show("</table>\n"));
Modified: incubator/trafficserver/traffic/branches/dev/iocore/cache/CachePart.cc
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/branches/dev/iocore/cache/CachePart.cc?rev=892310&r1=892309&r2=892310&view=diff
==============================================================================
--- incubator/trafficserver/traffic/branches/dev/iocore/cache/CachePart.cc (original)
+++ incubator/trafficserver/traffic/branches/dev/iocore/cache/CachePart.cc Fri Dec 18 16:47:37 2009
@@ -81,7 +81,7 @@
goto Ldone;
}
Lcont:
- segment = 0;
+ fragment = 0;
SET_HANDLER(&CacheVC::scanObject);
eventProcessor.schedule_in(this, HRTIME_MSECONDS(scan_msec_delay));
return EVENT_CONT;
@@ -113,14 +113,14 @@
if (_action.cancelled)
return free_CacheVC(this);
- MUTEX_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
+ CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
if (!lock) {
mutex->thread_holding->schedule_in_local(this, MUTEX_RETRY_DELAY);
return EVENT_CONT;
}
- if (!segment) { // initialize for first read
- segment = 1;
+ if (!fragment) { // initialize for first read
+ fragment = 1;
io.aiocb.aio_offset = part_offset_to_offset(part, 0);
io.aiocb.aio_nbytes = SCAN_BUF_SIZE;
io.aiocb.aio_buf = buf->data();
@@ -140,22 +140,22 @@
int i;
bool changed;
- if (doc->magic != DOC_MAGIC || !doc->hlen)
+ if (doc->magic != DOC_MAGIC || doc->ftype != CACHE_FRAG_TYPE_HTTP || !doc->hlen)
goto Lskip;
last_collision = NULL;
while (1) {
if (!dir_probe(&doc->first_key, part, &dir, &last_collision))
goto Lskip;
- if (!dir_agg_valid(part, &dir) || !dir.head ||
+ if (!dir_agg_valid(part, &dir) || !dir_head(&dir) ||
(part_offset(part, &dir) != io.aiocb.aio_offset + ((char *) doc - buf->data())))
continue;
break;
}
- if ((char *) doc - buf->data() + sizeofDoc + doc->hlen > (int) io.aiocb.aio_nbytes)
+ if (doc->data() - buf->data() > (int) io.aiocb.aio_nbytes)
goto Lskip;
{
- char *tmp = doc->hdr;
+ char *tmp = doc->hdr();
int len = doc->hlen;
while (len > 0) {
int r = HTTPInfo::unmarshal(tmp, len, buf._ptr());
@@ -167,7 +167,7 @@
tmp += r;
}
}
- if (vector.get_handles(doc->hdr, doc->hlen) != doc->hlen)
+ if (vector.get_handles(doc->hdr(), doc->hlen) != doc->hlen)
goto Lskip;
changed = false;
hostinfo_copied = 0;
@@ -224,11 +224,10 @@
// force remove even if there is a writer
cacheProcessor.remove(this, &doc->first_key, true, false, CACHE_FRAG_TYPE_HTTP, (char *) hname, hlen);
return EVENT_CONT;
-// dir_delete(&doc->first_key, part, &dir);
} else {
offset = (char *) doc - buf->data();
write_len = 0;
- f.http_request = 1;
+ frag_type = CACHE_FRAG_TYPE_HTTP;
f.use_first_key = 1;
f.evac_vector = 1;
first_key = key = doc->first_key;
@@ -306,71 +305,69 @@
return scanObject(EVENT_IMMEDIATE, 0);
}
}
- MUTEX_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
- if (!lock)
- VC_SCHED_LOCK_RETRY();
-
- Debug("cache_scan", "trying for writer lock");
- if (part->open_write(this, false, 1)) {
- writer_lock_retry++;
- SET_HANDLER(&CacheVC::scanOpenWrite);
- mutex->thread_holding->schedule_in_local(this, scan_msec_delay);
- return EVENT_CONT;
- }
-
- ink_debug_assert(this->od);
- // put all the alternates in the open directory vector
- int alt_count = vector.count();
- for (int i = 0; i < alt_count; i++) {
- write_vector->insert(vector.get(i));
- }
- od->writing_vec = 1;
- vector.clear(false);
- // check that the directory entry was not overwritten
- // if so return failure
- Debug("cache_scan", "got writer lock");
- Dir *l = NULL;
- Dir d;
- Doc *doc = (Doc *) (buf->data() + offset);
- offset = (char *) doc - buf->data() + round_to_approx_size(doc->len);
- // if the doc contains some data, then we need to create
- // a new directory entry for this fragment. Remember the
- // offset and the key in earliest_key
- dir_assign(&od->first_dir, &dir);
- if (doc->total_len) {
- dir_assign(&od->single_doc_dir, &dir);
- dir_set_tag(&od->single_doc_dir, DIR_MASK_TAG(doc->key.word(1)));
- od->single_doc_key = doc->key;
- od->move_resident_alt = 1;
- }
+ int ret = 0;
+ {
+ CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
+ if (!lock)
+ VC_SCHED_LOCK_RETRY();
+
+ Debug("cache_scan", "trying for writer lock");
+ if (part->open_write(this, false, 1)) {
+ writer_lock_retry++;
+ SET_HANDLER(&CacheVC::scanOpenWrite);
+ mutex->thread_holding->schedule_in_local(this, scan_msec_delay);
+ return EVENT_CONT;
+ }
- while (1) {
- if (!dir_probe(&first_key, part, &d, &l)) {
- part->close_write(this);
- _action.continuation->handleEvent(CACHE_EVENT_SCAN_OPERATION_FAILED, 0);
- SET_HANDLER(&CacheVC::scanObject);
- return handleEvent(EVENT_IMMEDIATE, 0);
+ ink_debug_assert(this->od);
+ // put all the alternates in the open directory vector
+ int alt_count = vector.count();
+ for (int i = 0; i < alt_count; i++) {
+ write_vector->insert(vector.get(i));
}
-/*
- if (!dir_agg_valid(part, &d) || !d.head ||
- (part_offset(part, &d) !=
- io.aiocb.aio_offset + ((char*)doc - buf->data()))) {
- Debug("cache_scan","dir entry is not valid");
- continue;
+ od->writing_vec = 1;
+ vector.clear(false);
+ // check that the directory entry was not overwritten
+ // if so return failure
+ Debug("cache_scan", "got writer lock");
+ Dir *l = NULL;
+ Dir d;
+ Doc *doc = (Doc *) (buf->data() + offset);
+ offset = (char *) doc - buf->data() + round_to_approx_size(doc->len);
+ // if the doc contains some data, then we need to create
+ // a new directory entry for this fragment. Remember the
+ // offset and the key in earliest_key
+ dir_assign(&od->first_dir, &dir);
+ if (doc->total_len) {
+ dir_assign(&od->single_doc_dir, &dir);
+ dir_set_tag(&od->single_doc_dir, doc->key.word(2));
+ od->single_doc_key = doc->key;
+ od->move_resident_alt = 1;
}
-*/
- if (*((inku64 *) & dir) != *((inku64 *) & d)) {
- Debug("cache_scan", "dir entry has changed");
- continue;
+
+ while (1) {
+ if (!dir_probe(&first_key, part, &d, &l)) {
+ part->close_write(this);
+ _action.continuation->handleEvent(CACHE_EVENT_SCAN_OPERATION_FAILED, 0);
+ SET_HANDLER(&CacheVC::scanObject);
+ return handleEvent(EVENT_IMMEDIATE, 0);
+ }
+ if (memcmp(&dir, &d, SIZEOF_DIR)) {
+ Debug("cache_scan", "dir entry has changed");
+ continue;
+ }
+ break;
}
- break;
- }
- // the document was not modified
- // we are safe from now on as we hold the
- // writer lock on the doc
- SET_HANDLER(&CacheVC::scanUpdateDone);
- return do_write();
+ // the document was not modified
+ // we are safe from now on as we hold the
+ // writer lock on the doc
+ SET_HANDLER(&CacheVC::scanUpdateDone);
+ ret = do_write_call();
+ }
+ if (ret == EVENT_RETURN)
+ return handleEvent(AIO_EVENT_DONE, 0);
+ return ret;
}
int
@@ -381,7 +378,7 @@
Debug("cache_scan_truss", "inside %p:scanUpdateDone", this);
cancel_trigger();
// get partition lock
- MUTEX_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
+ CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
if (lock) {
// insert a directory entry for the previous fragment
dir_overwrite(&first_key, part, &dir, &od->first_dir, false);