You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by jp...@apache.org on 2010/01/05 22:39:26 UTC
svn commit: r896234 - in
/incubator/trafficserver/traffic/branches/dev/iocore: aio/AIO.cc
aio/I_AIO.h aio/P_AIO.h cache/Cache.cc cache/CacheDir.cc cache/CacheDisk.cc
cache/CachePart.cc cache/CacheRead.cc cache/CacheWrite.cc
cache/P_CacheInternal.h
Author: jplevyak
Date: Tue Jan 5 21:39:25 2010
New Revision: 896234
URL: http://svn.apache.org/viewvc?rev=896234&view=rev
Log:
TS-102: Cleanup AIO: the aio operation now takes a 'thread' which is either the
thread to be called back on (via schedule_imm) or one of two special values
AIO_CALLBACK_THREAD_AIO which indicates that the callback should occur on the
AIO thread (in the case of the default THREAD aio type) or AIO_CALLBACK_THREAD_ANY
which indicates that the callback should occur on some EThread. Also removed
INKAIO ifdefs as that code is long dead.
Modified:
incubator/trafficserver/traffic/branches/dev/iocore/aio/AIO.cc
incubator/trafficserver/traffic/branches/dev/iocore/aio/I_AIO.h
incubator/trafficserver/traffic/branches/dev/iocore/aio/P_AIO.h
incubator/trafficserver/traffic/branches/dev/iocore/cache/Cache.cc
incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheDir.cc
incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheDisk.cc
incubator/trafficserver/traffic/branches/dev/iocore/cache/CachePart.cc
incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheRead.cc
incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheWrite.cc
incubator/trafficserver/traffic/branches/dev/iocore/cache/P_CacheInternal.h
Modified: incubator/trafficserver/traffic/branches/dev/iocore/aio/AIO.cc
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/branches/dev/iocore/aio/AIO.cc?rev=896234&r1=896233&r2=896234&view=diff
==============================================================================
--- incubator/trafficserver/traffic/branches/dev/iocore/aio/AIO.cc (original)
+++ incubator/trafficserver/traffic/branches/dev/iocore/aio/AIO.cc Tue Jan 5 21:39:25 2010
@@ -21,18 +21,13 @@
limitations under the License.
*/
-/****************************************************************************
-
- Async Disk IO operations.
-
-
-
- ****************************************************************************/
+/*
+ * Async Disk IO operations.
+ */
#include "P_AIO.h"
#define MAX_DISKS_POSSIBLE 100
-#define SLEEP_TIME 100
// globals
@@ -46,7 +41,6 @@
// Don't need to acquire this for searching the array
static ink_mutex insert_mutex;
RecInt cache_config_threads_per_disk = 12;
-static RecInt cache_config_aio_sleep_time = SLEEP_TIME;
Continuation *aio_err_callbck = 0;
// AIO Stats
@@ -55,209 +49,14 @@
inku64 aio_num_write = 0;
inku64 aio_bytes_written = 0;
-static void aio_move(AIO_Reqs * req);
-
-//////////////////////////////////////////////////////////////////////////
-/////////////// INKIO /////////////////
-//////////////////////////////////////////////////////////////////////////
-
-#ifdef INKDISKAIO
-#undef AIO_MODE
-#define AIO_MODE AIO_MODE_THREAD
-#define AIO_PENDING_WATERMARK 400
-#define AIO_PENDING_WARNING 1000
-ink32 write_thread_index = 0;
-#define NOS_WRITE_THREADS 10
-int nos_write_threads = NOS_WRITE_THREADS;
-
-#define MY_AIO_THREAD (get_dmid(this_ethread()))
-#define DISK_WRITE_THREAD ((this_ethread()->tt == DEDICATED))
-
-
-inline int
-my_aio_thread(void)
-{
- ink_assert(!DISK_WRITE_THREAD);
- int ret = write_thread_index % nos_write_threads;
- ink_atomic_increment(&write_thread_index, 1);
- ink_atomic_swap(&write_thread_index, write_thread_index % nos_write_threads);
- return (this_ethread()->id * nos_write_threads + ret);
-}
-
-
-unsigned int last_pending_report = 0;
-int pending_report_interval = 10000000;
-int inkdiskio_watermark = AIO_PENDING_WATERMARK;
-volatile int aio_pending = 0;
-volatile int pending_size = 0;
-int aio_queued = 0;
-volatile int max_aio_queued = 0;
-
-
-#include "inkaio.h"
-
-
-#define DISK_PERIOD -HRTIME_MSECONDS(11)
-static void aio_insert(AIOCallback * op, AIO_Reqs * req);
-int
-disk_io_handler(kcall_t * ioep)
-{
- AIOCallback *op = (AIOCallback *) ioep->cookie;
-
- struct ink_aiocb_t *a = &op->aiocb;
- ink_assert(op->action.mutex);
- ink_assert(a->aio_nbytes > 0);
- if (op->aiocb.aio_lio_opcode == LIO_READ || op->aiocb.aio_lio_opcode == LIO_WRITE) {
- ink_atomic_increment(&pending_size, -op->aiocb.aio_nbytes);
- AIO_Reqs *req = aio_reqs[MY_AIO_THREAD];
- ink_atomic_increment(&req->pending, -1);
- }
- if (ioep->type == INKAIO_FLUSH) {
- Warning("AIO ERROR \n");
- op->aio_result = -1;
- } else
- op->aio_result = op->aiocb.aio_nbytes; // aio_result is not used here!
- op->link.prev = NULL;
- op->link.next = NULL;
- op->mutex = op->action.mutex;
- EThread *thread = this_ethread();
- if (a->aio_lio_opcode == LIO_READ) {
- eventProcessor.schedule_imm(op);
- } else {
- MUTEX_TRY_LOCK(lock, op->action.mutex, thread);
- if (lock)
- op->action.continuation->handleEvent(AIO_EVENT_DONE, op);
- else {
- eventProcessor.schedule_imm(NEW(new AIOMissEvent(op->action.mutex, op)));
- }
- }
- return 0;
-}
-
-
-class DiskWriteMonitor:public Continuation
-{
-public:
- int my_id;
- int monitor_main(int event, Event * e);
- DiskWriteMonitor(ProxyMutex * m, int id):Continuation(m), my_id(id)
- {
- SET_HANDLER(&DiskWriteMonitor::monitor_main);
- };
-};
-
-
-inline void
-drain_thread_queue(int thread_id)
-{
- AIO_Reqs *req = aio_reqs[thread_id];
- MUTEX_TRY_LOCK(lock, req->list_mutex, this_ethread());
- if (!lock)
- return;
- bool run_once = false;
- while (req->pending < inkdiskio_watermark && (req->http_aio_todo.tail)) {
- AIOCallback *op1;
- struct ink_aiocb_t *a1;
- op1 = (AIOCallback *) req->http_aio_todo.pop();
- if (!op1)
- break;
- a1 = &op1->aiocb;
- if (op1->aiocb.aio_lio_opcode == LIO_READ) {
- ink_atomic_increment(&pending_size, op1->aiocb.aio_nbytes);
- if (inkaio_aioread(get_kcb(this_ethread()), (void *) op1, a1->aio_fildes,
- ((char *) a1->aio_buf), a1->aio_nbytes, a1->aio_offset)) {
- // FIXME
- ink_release_assert(false);
- }
- } else {
- ink_atomic_increment(&pending_size, op1->aiocb.aio_nbytes);
- if (inkaio_aiowrite(get_kcb(this_ethread()), (void *) op1, a1->aio_fildes,
- ((char *) a1->aio_buf), a1->aio_nbytes, a1->aio_offset)) {
- // FIXME
- ink_release_assert(false);
- }
- }
- if (!run_once)
- run_once = true;
- ink_atomic_increment(&req->pending, 1);
- ink_atomic_increment(&req->queued, -1);
- }
- if (run_once)
- inkaio_submit(get_kcb(this_ethread()));
-}
-
-int
-DiskWriteMonitor::monitor_main(int event, Event * e)
-{
- static struct pollfd pfd;
- static int poll_ret;
- static AIO_Reqs *req = aio_reqs[my_id];
- EThread *t = this_ethread();
- // create a queue for kcalls
- if (get_dm(this_ethread()) != this) {
- *((Continuation **) ETHREAD_GET_PTR(t, dm_offset)) = this;
- *((int *) ETHREAD_GET_PTR(t, dmid_offset)) = my_id;
- }
- *((INKAIOCB **) ETHREAD_GET_PTR(t, kcb_offset)) = inkaio_create(0, disk_io_handler);
- if (get_kcb(t) == 0) {
- Error("inkaio_create : could not create");
- exit(-1);
- } else
- printf("Created writed thread %d \n", my_id);
-
- pfd.fd = inkaio_filno(get_kcb(this_ethread()));
- pfd.events = POLLIN;
- for (;;) {
- poll_ret = poll(NULL, 0, 10);
- {
- MUTEX_TRY_LOCK(lock, req->list_mutex, this_ethread());
- if (lock) {
- if (!INK_ATOMICLIST_EMPTY(req->aio_temp_list)) {
- aio_move(req);
- }
- inkaio_dispatch((get_kcb(e->ethread)));
- drain_thread_queue(my_id);
- }
- }
- }
- return 0;
-}
+static void aio_move(AIO_Reqs *req);
-void
-initialize_thread_for_diskaio(EThread * thread)
-{
- AIO_Reqs *request;
- if (ts_config_with_inkdiskio) {
- for (int i = 0; i < nos_write_threads; i++) {
- request = (AIO_Reqs *) xmalloc(sizeof(AIO_Reqs));
- memset(request, 0, sizeof(AIO_Reqs));
-
- INK_WRITE_MEMORY_BARRIER;
-
- // ink_cond_init(&request->aio_cond);
- // ink_mutex_init(&request->aio_mutex, NULL);
- request->list_mutex = new_ProxyMutex();
- ink_atomiclist_init(&request->aio_temp_list, "temp_list", (unsigned) &((AIOCallback *) 0)->link);
-
- request->index = 0;
- request->filedes = -1;
- aio_reqs[nos_write_threads * thread->id + i] = request;
- DiskWriteMonitor *dm = new DiskWriteMonitor(new_ProxyMutex(),
- nos_write_threads * thread->id + i);
- eventProcessor.spawn_thread(dm);
- }
- }
-}
-#endif
-
-
-
-////////////////////////////////////////////////////////////////////////
-//////// Stats Stuff //////
-////////////////////////////////////////////////////////////////////////
+/*
+ * Stats
+ */
static int
-aio_stats_cb(const char *name, RecDataT data_type, RecData * data, RecRawStatBlock * rsb, int id)
+aio_stats_cb(const char *name, RecDataT data_type, RecData *data, RecRawStatBlock *rsb, int id)
{
(void) data_type;
(void) rsb;
@@ -272,7 +71,6 @@
RecGetGlobalRawStatSum(aio_rsb, id, &sum);
RecGetGlobalRawStatCount(aio_rsb, id, &count);
-
ink64 time_diff = ink_hrtime_to_msec(now - count);
if (time_diff == 0) {
data->rec_float = 0.0;
@@ -282,18 +80,15 @@
case AIO_STAT_READ_PER_SEC:
new_val = aio_num_read;
break;
-
case AIO_STAT_WRITE_PER_SEC:
new_val = aio_num_write;
break;
-
case AIO_STAT_KB_READ_PER_SEC:
new_val = aio_bytes_read >> 10;
break;
case AIO_STAT_KB_WRITE_PER_SEC:
new_val = aio_bytes_written >> 10;
break;
-
default:
ink_assert(0);
}
@@ -314,13 +109,10 @@
int
AIOTestData::ink_aio_stats(int event, void *d)
{
-
ink_hrtime now = ink_get_hrtime();
double time_msec = (double) (now - start) / (double) HRTIME_MSECOND;
- for (int i = 0; i < num_filedes; i++) {
-
+ for (int i = 0; i < num_filedes; i++)
printf("%0.2f\t%i\t%i\t%i\n", time_msec, aio_reqs[i]->filedes, aio_reqs[i]->pending, aio_reqs[i]->queued);
- }
printf("Num Requests: %i Num Queued: %i num Moved: %i\n\n", data->num_req, data->num_queue, data->num_temp);
eventProcessor.schedule_in(this, HRTIME_MSECONDS(50), ET_CALL);
return EVENT_DONE;
@@ -328,9 +120,9 @@
#endif // AIO_STATS
-////////////////////////////////////////////////////////////////////////
-//////// Common Stuff //////
-////////////////////////////////////////////////////////////////////////
+/*
+ * Common
+ */
AIOCallback *
new_AIOCallback(void)
{
@@ -338,7 +130,7 @@
};
void
-ink_aio_set_callback(Continuation * callback)
+ink_aio_set_callback(Continuation *callback)
{
aio_err_callbck = callback;
}
@@ -360,45 +152,14 @@
"proxy.process.cache.KB_write_per_sec",
RECD_FLOAT, RECP_NULL, (int) AIO_STAT_KB_WRITE_PER_SEC, aio_stats_cb);
-#ifdef INKDISKAIO
- kcb_offset = eventProcessor.allocate(sizeof(INKAIOCB *));
- dm_offset = eventProcessor.allocate(sizeof(Continuation *));
- ink_assert(dm_offset - kcb_offset >= sizeof(Continuation *));
- dmid_offset = eventProcessor.allocate(sizeof(int));
- ink_assert(dmid_offset - dm_offset >= sizeof(int));
- IOCORE_RegisterConfigInteger(RECT_CONFIG, "proxy.config.net.enable_ink_disk_io", 0, RECU_RESTART_TS, RECC_NULL, NULL);
-
- IOCORE_ReadConfigInteger(ts_config_with_inkdiskio, "proxy.config.net.enable_ink_disk_io");
-
- IOCORE_RegisterConfigInteger(RECT_CONFIG, "proxy.config.net.ink_disk_io_watermark", 400, RECU_NULL, RECC_NULL, NULL);
- IOCORE_ReadConfigInteger(inkdiskio_watermark, "proxy.config.net.ink_disk_io_watermark");
-
- IOCORE_RegisterConfigInteger(RECT_CONFIG, "proxy.config.net.ink_aio_write_threads", 10, RECU_NULL, RECC_NULL, NULL);
- IOCORE_ReadConfigInteger(nos_write_threads, "proxy.config.net.ink_aio_write_threads");
-
- if (ts_config_with_inkdiskio) {
- if (FILE * fp = fopen("/dev/inkaio", "rw")) {
- fclose(fp);
- } else {
- Warning("Inkio for disk cannot be enabled: /dev/kcalls does not exist");
- ts_config_with_inkdiskio = 0;
- }
- }
-#endif
if (!ts_config_with_inkdiskio) {
memset(&aio_reqs, 0, MAX_DISKS_POSSIBLE * sizeof(AIO_Reqs *));
ink_mutex_init(&insert_mutex, NULL);
IOCORE_RegisterConfigInteger(RECT_CONFIG, "proxy.config.cache.threads_per_disk", 4, RECU_NULL, RECC_NULL, NULL);
IOCORE_ReadConfigInteger(cache_config_threads_per_disk, "proxy.config.cache.threads_per_disk");
-
- IOCORE_RegisterConfigInteger(RECT_CONFIG, "proxy.config.cache.aio_sleep_time", 100, RECU_DYNAMIC, RECC_NULL, NULL);
- IOCORE_ReadConfigInteger(cache_config_aio_sleep_time, "proxy.config.cache.aio_sleep_time");
}
-
-
}
-
int
ink_aio_start()
{
@@ -410,10 +171,6 @@
}
-////////////////////////////////////////////////////////////////////////
-//////// Unix Stuff //////
-////////////////////////////////////////////////////////////////////////
-
static void *aio_thread_main(void *arg);
struct AIOThreadInfo:public Continuation
@@ -422,7 +179,7 @@
AIO_Reqs *req;
int sleep_wait;
- int start(int event, Event * e)
+ int start(int event, Event *e)
{
(void) event;
(void) e;
@@ -430,7 +187,7 @@
return EVENT_DONE;
}
- AIOThreadInfo(AIO_Reqs * thr_req, int sleep):Continuation(new_ProxyMutex()), req(thr_req), sleep_wait(sleep)
+ AIOThreadInfo(AIO_Reqs *thr_req, int sleep):Continuation(new_ProxyMutex()), req(thr_req), sleep_wait(sleep)
{
SET_HANDLER(&AIOThreadInfo::start);
}
@@ -489,12 +246,8 @@
/* insert a request into either aio_todo or http_todo queue. aio_todo
list is kept sorted */
static void
-aio_insert(AIOCallback * op, AIO_Reqs * req)
+aio_insert(AIOCallback *op, AIO_Reqs *req)
{
-#ifdef INKDISKAIO
- op->aiocb.aio_reqprio = AIO_LOWEST_PRIORITY;
-#endif
-
#ifdef AIO_STATS
num_requests++;
req->queued++;
@@ -519,15 +272,12 @@
/* Either the queue was empty or this request has the highest priority */
req->aio_todo.push(op);
-
}
- return;
-
}
/* move the request from the atomic list to the queue */
static void
-aio_move(AIO_Reqs * req)
+aio_move(AIO_Reqs *req)
{
AIOCallback *next = NULL, *prev = NULL, *cb = (AIOCallback *) ink_atomiclist_popall(&req->aio_temp_list);
/* flip the list */
@@ -551,7 +301,7 @@
/* queue the new request */
static void
-aio_queue_req(AIOCallbackInternal * op)
+aio_queue_req(AIOCallbackInternal *op)
{
int thread_ndx = 0;
AIO_Reqs *req = op->aio_req;
@@ -560,22 +310,6 @@
#ifdef AIO_STATS
ink_atomic_increment((int *) &data->num_req, 1);
#endif
-#ifdef INKDISKAIO
- if (ts_config_with_inkdiskio) {
- if (DISK_WRITE_THREAD) {
- req = aio_reqs[MY_AIO_THREAD];
- ink_assert(req);
- } else {
- req = aio_reqs[my_aio_thread()];
- if (!req) {
- initialize_thread_for_diskaio(this_ethread());
- req = aio_reqs[my_aio_thread()];
- }
- ink_assert(req);
- }
- goto Lacquirelock;
- }
-#endif
if (!req || req->filedes != op->aiocb.aio_fildes) {
/* search for the matching file descriptor */
for (; thread_ndx < num_filedes; thread_ndx++) {
@@ -608,13 +342,7 @@
}
op->aio_req = req;
}
-#ifndef INKDISKAIO
- ink_assert(req->filedes == op->aiocb.aio_fildes);
-#endif
ink_atomic_increment(&req->requests_queued, 1);
-#ifdef INKDISKAIO
-Lacquirelock:
-#endif
if (!ink_mutex_try_acquire(&req->aio_mutex)) {
#ifdef AIO_STATS
ink_atomic_increment(&data->num_temp, 1);
@@ -635,18 +363,13 @@
}
static inline int
-cache_op(AIOCallbackInternal * op)
+cache_op(AIOCallbackInternal *op)
{
bool read = (op->aiocb.aio_lio_opcode == LIO_READ) ? 1 : 0;
for (; op; op = (AIOCallbackInternal *) op->then) {
ink_aiocb_t *a = &op->aiocb;
int err, res = 0;
-#ifdef DEBUG
- if (op->sleep_time) {
- ink_sleep(op->sleep_time);
- }
-#endif
while (a->aio_nbytes - res > 0) {
do {
if (read)
@@ -655,10 +378,8 @@
err = ink_pwrite(a->aio_fildes, ((char *) a->aio_buf) + res, a->aio_nbytes - res, a->aio_offset + res);
} while ((err < 0) && (errno == EINTR || errno == ENOBUFS || errno == ENOMEM));
if (err <= 0) {
-#ifdef DIAGS_MODULARIZED
Warning("cache disk operation failed %s %d %d\n",
(a->aio_lio_opcode == LIO_READ) ? "READ" : "WRITE", err, errno);
-#endif
op->aio_result = -errno;
return (err);
}
@@ -671,7 +392,7 @@
}
int
-ink_aio_read(AIOCallback * op)
+ink_aio_read(AIOCallback *op)
{
op->aiocb.aio_lio_opcode = LIO_READ;
switch (AIO_MODE) {
@@ -690,24 +411,15 @@
cache_op((AIOCallbackInternal *) op);
op->action.continuation->handleEvent(AIO_EVENT_DONE, op);
break;
- case AIO_MODE_THREAD:{
- aio_queue_req((AIOCallbackInternal *) op);
- break;
- }
-#if 0
- case AIO_MODE_INK:
- ink_disk_read(op->aiocb.aio_fildes, (char *) op->aiocb.aio_buf, op->aiocb.aio_nbytes, op->aiocb.aio_offset, op);
- break;
- default:
- ASSERT(0);
+ case AIO_MODE_THREAD:
+ aio_queue_req((AIOCallbackInternal *) op);
break;
-#endif
}
return 1;
}
int
-ink_aio_write(AIOCallback * op)
+ink_aio_write(AIOCallback *op)
{
op->aiocb.aio_lio_opcode = LIO_WRITE;
switch (AIO_MODE) {
@@ -726,18 +438,9 @@
cache_op((AIOCallbackInternal *) op);
op->action.continuation->handleEvent(AIO_EVENT_DONE, op);
break;
- case AIO_MODE_THREAD:{
- aio_queue_req((AIOCallbackInternal *) op);
- break;
- }
-#if 0
- case IO_MODE_INK:
- ink_disk_write(op->aiocb.aio_fildes, (char *) op->aiocb.aio_buf, op->aiocb.aio_nbytes, op->aiocb.aio_offset, op);
- break;
- default:
- ASSERT(0);
+ case AIO_MODE_THREAD:
+ aio_queue_req((AIOCallbackInternal *) op);
break;
-#endif
}
return 1;
}
@@ -747,7 +450,6 @@
{
AIOThreadInfo *thr_info = (AIOThreadInfo *) arg;
AIO_Reqs *my_aio_req = (AIO_Reqs *) thr_info->req;
- int timed_wait = thr_info->sleep_wait;
AIO_Reqs *current_req = NULL;
AIOCallback *op = NULL;
ink_mutex_acquire(&my_aio_req->aio_mutex);
@@ -787,37 +489,18 @@
#endif
op->link.prev = NULL;
op->link.next = NULL;
- // make op continuation share op->action's mutex
op->mutex = op->action.mutex;
- /* why do we callback on AIO thread only if its a write??
- See INKqa07855. The problem is that with a lot of users,
- the Net threads take a lot of time (as high as a second)
- to come back to ink_aio_complete. This means that the
- partition can only issue 1 i/o per second. To
- get around this problem, we have the aio threads callback
- the partitions directly. The partition issues another i/o
- on the same thread (assuming there is enough stuff to be
- written). The partition is careful not to callback the VC's
- and not schedule any events on the thread.
- It does not matter for reads because its generally the
- CacheVC's that issue reads and they have to do a fair
- bit of computation (go through the docheader, call http
- state machine back, etc) before they can issue another
- read.
- */
- if (op->aiocb.aio_lio_opcode == LIO_WRITE) {
+ if (op->thread == AIO_CALLBACK_THREAD_AIO) {
MUTEX_LOCK(lock, op->mutex, thr_info->mutex->thread_holding);
if (!op->action.cancelled)
op->action.continuation->handleEvent(AIO_EVENT_DONE, op);
- } else
+ } else if (op->thread == AIO_CALLBACK_THREAD_ANY)
+ eventProcessor.schedule_imm(op);
+ else
op->thread->schedule_imm(op);
ink_mutex_acquire(&my_aio_req->aio_mutex);
} while (1);
- if (timed_wait) {
- timespec ts = ink_based_hrtime_to_timespec(ink_get_hrtime() + HRTIME_MSECONDS(cache_config_aio_sleep_time));
- ink_cond_timedwait(&my_aio_req->aio_cond, &my_aio_req->aio_mutex, &ts);
- } else
- ink_cond_wait(&my_aio_req->aio_cond, &my_aio_req->aio_mutex);
+ ink_cond_wait(&my_aio_req->aio_cond, &my_aio_req->aio_mutex);
}
return 0;
}
Modified: incubator/trafficserver/traffic/branches/dev/iocore/aio/I_AIO.h
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/branches/dev/iocore/aio/I_AIO.h?rev=896234&r1=896233&r2=896234&view=diff
==============================================================================
--- incubator/trafficserver/traffic/branches/dev/iocore/aio/I_AIO.h (original)
+++ incubator/trafficserver/traffic/branches/dev/iocore/aio/I_AIO.h Tue Jan 5 21:39:25 2010
@@ -50,11 +50,14 @@
#define AIO_MODE_AIO 0
#define AIO_MODE_SYNC 1
#define AIO_MODE_THREAD 2
-#define AIO_MODE_INK 3
#define AIO_MODE AIO_MODE_THREAD
+// AIOCallback::thread special values
+#define AIO_CALLBACK_THREAD_ANY ((EThread*)0) // any regular event thread
+#define AIO_CALLBACK_THREAD_AIO ((EThread*)-1)
+
#define AIO_LOWEST_PRIORITY 0
-#define AIO_DEFAULT_PRIORITY AIO_LOWEST_PRIORTY
+#define AIO_DEFAULT_PRIORITY AIO_LOWEST_PRIORITY
struct AIOCallback:Continuation
{
@@ -67,8 +70,9 @@
int aio_result;
int ok();
- // AIOCallback();
- virtual void AIOCallback_is_an_abstract_class() = 0;
+ AIOCallback() : thread(AIO_CALLBACK_THREAD_ANY), then(0) {
+ aiocb.aio_reqprio = AIO_DEFAULT_PRIORITY;
+ }
};
void ink_aio_init(ModuleVersion version);
Modified: incubator/trafficserver/traffic/branches/dev/iocore/aio/P_AIO.h
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/branches/dev/iocore/aio/P_AIO.h?rev=896234&r1=896233&r2=896234&view=diff
==============================================================================
--- incubator/trafficserver/traffic/branches/dev/iocore/aio/P_AIO.h (original)
+++ incubator/trafficserver/traffic/branches/dev/iocore/aio/P_AIO.h Tue Jan 5 21:39:25 2010
@@ -39,7 +39,8 @@
#include "P_EventSystem.h"
#include "I_AIO.h"
-// #define AIO_STATS
+// for debugging
+// #define AIO_STATS 1
#undef AIO_MODULE_VERSION
#define AIO_MODULE_VERSION makeModuleVersion(AIO_MODULE_MAJOR_VERSION,\
@@ -53,9 +54,6 @@
AIO_Reqs *aio_req;
ink_hrtime sleep_time;
int io_complete(int event, void *data);
- void AIOCallback_is_an_abstract_class()
- {
- }
AIOCallbackInternal()
{
const size_t to_zero = sizeof(AIOCallbackInternal)
@@ -90,9 +88,6 @@
/* Atomic list to temporarily hold the request if the
lock for a particular queue cannot be acquired */
InkAtomicList aio_temp_list;
-#ifdef INKDISKAIO
- ProxyMutexPtr list_mutex;
-#endif
ink_mutex aio_mutex;
ink_cond aio_cond;
int index; /* position of this struct in the aio_reqs array */
@@ -110,55 +105,15 @@
int num_temp;
int num_queue;
ink_hrtime start;
- AIOTestData():Continuation(new_ProxyMutex()), num_req(0), num_temp(0), num_queue(0)
- {
- start = ink_get_hrtime();
- SET_HANDLER(&AIOTestData::ink_aio_stats);
- }
int ink_aio_stats(int event, void *data);
-};
-#endif
-
-#ifdef INKDISKAIO
-#include "inkaio.h"
-void initialize_thread_for_diskaio(EThread *);
-struct AIOMissEvent:Continuation
-{
- AIOCallback *cb;
-
- int mainEvent(int event, Event * e)
+ AIOTestData():Continuation(new_ProxyMutex()), num_req(0), num_temp(0), num_queue(0)
{
- if (!cb->action.cancelled)
- cb->action.continuation->handleEvent(AIO_EVENT_DONE, cb);
- delete this;
- return EVENT_DONE;
- }
-
- AIOMissEvent(ProxyMutex * amutex, AIOCallback * acb)
- : Continuation(amutex), cb(acb)
- {
- SET_HANDLER(&AIOMissEvent::mainEvent);
+ start = ink_get_hrtime();
+ SET_HANDLER(&AIOTestData::ink_aio_stats);
}
};
-static ink_off_t kcb_offset, dm_offset, dmid_offset;
-inline INKAIOCB *
-get_kcb(EThread * t)
-{
- return *((INKAIOCB **) ETHREAD_GET_PTR(t, kcb_offset));
-}
-
-inline Continuation *
-get_dm(EThread * t)
-{
- return *((Continuation **) ETHREAD_GET_PTR(t, dm_offset));
-}
-inline int
-get_dmid(EThread * t)
-{
- return *((int *) ETHREAD_GET_PTR(t, dmid_offset));
-}
#endif
enum aio_stat_enum
Modified: incubator/trafficserver/traffic/branches/dev/iocore/cache/Cache.cc
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/branches/dev/iocore/cache/Cache.cc?rev=896234&r1=896233&r2=896234&view=diff
==============================================================================
--- incubator/trafficserver/traffic/branches/dev/iocore/cache/Cache.cc (original)
+++ incubator/trafficserver/traffic/branches/dev/iocore/cache/Cache.cc Tue Jan 5 21:39:25 2010
@@ -955,12 +955,11 @@
SET_HANDLER(&Part::handle_dir_clear);
io.aiocb.aio_fildes = fd;
- io.aiocb.aio_reqprio = 0;
io.aiocb.aio_buf = raw_dir;
io.aiocb.aio_nbytes = dir_len;
io.aiocb.aio_offset = skip;
io.action = this;
- io.thread = this_ethread();
+ io.thread = AIO_CALLBACK_THREAD_ANY;
io.then = 0;
ink_assert(ink_aio_write(&io));
return 0;
@@ -1038,7 +1037,6 @@
for (i = 0; i < 4; i++) {
AIOCallback *aio = &(init_info->part_aio[i]);
aio->aiocb.aio_fildes = fd;
- aio->aiocb.aio_reqprio = 0;
aio->aiocb.aio_buf = &(init_info->part_h_f[i * INK_BLOCK_SIZE]);
aio->aiocb.aio_nbytes = footerlen;
aio->action = this;
@@ -1365,9 +1363,8 @@
for (int i = 0; i < 3; i++) {
AIOCallback *aio = &(init_info->part_aio[i]);
aio->aiocb.aio_fildes = fd;
- aio->aiocb.aio_reqprio = 0;
aio->action = this;
- aio->thread = this_ethread();
+ aio->thread = AIO_CALLBACK_THREAD_ANY;
aio->then = (i < 2) ? &(init_info->part_aio[i + 1]) : 0;
}
int footerlen = ROUND_TO_BLOCK(sizeof(PartHeaderFooter));
@@ -1438,11 +1435,10 @@
}
io.aiocb.aio_fildes = fd;
- io.aiocb.aio_reqprio = 0;
io.aiocb.aio_nbytes = part_dirlen(this);
io.aiocb.aio_buf = raw_dir;
io.action = this;
- io.thread = this_ethread();
+ io.thread = AIO_CALLBACK_THREAD_ANY;
io.then = 0;
if (hf[0]->sync_serial == hf[1]->sync_serial &&
Modified: incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheDir.cc
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheDir.cc?rev=896234&r1=896233&r2=896234&view=diff
==============================================================================
--- incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheDir.cc (original)
+++ incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheDir.cc Tue Jan 5 21:39:25 2010
@@ -887,7 +887,7 @@
io.aiocb.aio_nbytes = n;
io.aiocb.aio_buf = b;
io.action = this;
- io.thread = mutex->thread_holding;
+ io.thread = AIO_CALLBACK_THREAD_ANY;
ink_assert(ink_aio_write(&io) >= 0);
}
Modified: incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheDisk.cc
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheDisk.cc?rev=896234&r1=896233&r2=896234&view=diff
==============================================================================
--- incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheDisk.cc (original)
+++ incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheDisk.cc Tue Jan 5 21:39:25 2010
@@ -67,20 +67,17 @@
return clearDisk();
}
-
SET_HANDLER(&CacheDisk::openStart);
io.aiocb.aio_offset = skip;
io.aiocb.aio_buf = (char *) header;
io.aiocb.aio_nbytes = header_len;
-
- io.thread = this_ethread();
+ io.thread = AIO_CALLBACK_THREAD_ANY;
ink_aio_read(&io);
return 0;
}
CacheDisk::~CacheDisk()
{
-
if (path) {
xfree(path);
for (int i = 0; i < (int) header->num_partitions; i++) {
@@ -104,13 +101,12 @@
int
CacheDisk::clearDisk()
{
-
delete_all_partitions();
io.aiocb.aio_offset = skip;
io.aiocb.aio_buf = header;
io.aiocb.aio_nbytes = header_len;
- io.thread = this_ethread();
+ io.thread = AIO_CALLBACK_THREAD_ANY;
ink_aio_write(&io);
return 0;
}
@@ -157,7 +153,6 @@
SET_HANDLER(&CacheDisk::openDone);
return openDone(EVENT_IMMEDIATE, 0);
-
}
int
@@ -178,11 +173,10 @@
int
CacheDisk::sync()
{
-
io.aiocb.aio_offset = skip;
io.aiocb.aio_buf = header;
io.aiocb.aio_nbytes = header_len;
- io.thread = this_ethread();
+ io.thread = AIO_CALLBACK_THREAD_ANY;
ink_aio_write(&io);
return 0;
}
@@ -207,7 +201,6 @@
DiskPartBlock *
CacheDisk::create_partition(int number, ink_off_t size_in_blocks, int scheme)
{
-
if (size_in_blocks == 0)
return NULL;
@@ -233,7 +226,6 @@
}
}
-
if (!p && !closest_match)
return NULL;
@@ -267,7 +259,6 @@
free_blocks->size += dpb->len;
free_space += dpb->len;
header->num_diskpart_blks++;
-
} else
header->num_free--;
@@ -296,7 +287,6 @@
disk_parts[i]->size = q->b->len;
header->num_partitions++;
}
-
return p;
}
@@ -304,7 +294,6 @@
int
CacheDisk::delete_partition(int number)
{
-
unsigned int i;
for (i = 0; i < header->num_partitions; i++) {
if (disk_parts[i]->part_number == number) {
@@ -340,7 +329,6 @@
void
CacheDisk::update_header()
{
-
unsigned int n = 0;
unsigned int i, j;
if (free_blocks) {
@@ -397,14 +385,12 @@
}
}
-
ink_assert(n == header->num_partitions);
}
DiskPart *
CacheDisk::get_diskpart(int part_number)
{
-
unsigned int i;
for (i = 0; i < header->num_partitions; i++) {
if (disk_parts[i]->part_number == part_number) {
@@ -417,7 +403,6 @@
int
CacheDisk::delete_all_partitions()
{
-
header->part_info[0].offset = start;
header->part_info[0].len = num_usable_blocks;
header->part_info[0].type = CACHE_NONE_TYPE;
Modified: incubator/trafficserver/traffic/branches/dev/iocore/cache/CachePart.cc
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/branches/dev/iocore/cache/CachePart.cc?rev=896234&r1=896233&r2=896234&view=diff
==============================================================================
--- incubator/trafficserver/traffic/branches/dev/iocore/cache/CachePart.cc (original)
+++ incubator/trafficserver/traffic/branches/dev/iocore/cache/CachePart.cc Tue Jan 5 21:39:25 2010
@@ -125,7 +125,7 @@
io.aiocb.aio_nbytes = SCAN_BUF_SIZE;
io.aiocb.aio_buf = buf->data();
io.action = this;
- io.thread = mutex->thread_holding;
+ io.thread = AIO_CALLBACK_THREAD_ANY;
goto Lread;
}
Modified: incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheRead.cc
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheRead.cc?rev=896234&r1=896233&r2=896234&view=diff
==============================================================================
--- incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheRead.cc (original)
+++ incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheRead.cc Tue Jan 5 21:39:25 2010
@@ -297,13 +297,11 @@
CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
if (!lock)
VC_SCHED_LOCK_RETRY();
+ od = part->open_read(&first_key); // recheck in case the lock failed
if (!od) {
- od = part->open_read(&first_key);
- if (!od) {
- write_vc = NULL;
- SET_HANDLER(&CacheVC::openReadStartHead);
- return openReadStartHead(event, e);
- }
+ write_vc = NULL;
+ SET_HANDLER(&CacheVC::openReadStartHead);
+ return openReadStartHead(event, e);
} else
ink_debug_assert(od == part->open_read(&first_key));
if (!write_vc) {
@@ -419,8 +417,8 @@
writer_buf = write_vc->blocks;
writer_offset = write_vc->offset;
length = write_vc->length;
- //copy the vector
- f.single_fragment = !write_vc->fragment; //single fragment doc
+ // copy the vector
+ f.single_fragment = !write_vc->fragment; // single fragment doc
docpos = 0;
earliest_key = write_vc->earliest_key;
ink_assert(earliest_key == key);
@@ -428,14 +426,11 @@
dir_clean(&first_dir);
dir_clean(&earliest_dir);
Debug("cache_read_agg", "%x key: %X %X: single fragment read", first_key.word(1), key.word(0));
-
MUTEX_RELEASE(writer_lock);
- //we've got everything....ready to roll!!
SET_HANDLER(&CacheVC::openReadFromWriterMain);
CACHE_INCREMENT_DYN_STAT(cache_read_busy_success_stat);
_action.continuation->handleEvent(CACHE_EVENT_OPEN_READ, (void *) this);
return EVENT_DONE;
-
#endif //READ_WHILE_WRITER
}
Modified: incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheWrite.cc
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheWrite.cc?rev=896234&r1=896233&r2=896234&view=diff
==============================================================================
--- incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheWrite.cc (original)
+++ incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheWrite.cc Tue Jan 5 21:39:25 2010
@@ -729,7 +729,7 @@
io.aiocb.aio_buf = doc_evacuator->buf->data();
io.action = this;
- io.thread = mutex->thread_holding;
+ io.thread = AIO_CALLBACK_THREAD_ANY;
Debug("cache_evac", "evac_range evacuating %X %d", (int) dir_tag(&first->dir), (int) dir_offset(&first->dir));
SET_HANDLER(&Part::evacuateDocReadDone);
ink_assert(ink_aio_read(&io) >= 0);
@@ -1040,7 +1040,12 @@
io.aiocb.aio_buf = agg_buffer;
io.aiocb.aio_nbytes = agg_buf_pos;
io.action = this;
- io.thread = mutex->thread_holding;
+ /*
+ Callback on AIO thread so that we can issue a new write ASAP
+ as all writes are serialized in the partition. This is not necessary
+ for reads proceed independently.
+ */
+ io.thread = AIO_CALLBACK_THREAD_AIO;
SET_HANDLER(&Part::aggWriteDone);
ink_aio_write(&io);
Modified: incubator/trafficserver/traffic/branches/dev/iocore/cache/P_CacheInternal.h
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/branches/dev/iocore/cache/P_CacheInternal.h?rev=896234&r1=896233&r2=896234&view=diff
==============================================================================
--- incubator/trafficserver/traffic/branches/dev/iocore/cache/P_CacheInternal.h (original)
+++ incubator/trafficserver/traffic/branches/dev/iocore/cache/P_CacheInternal.h Tue Jan 5 21:39:25 2010
@@ -528,6 +528,7 @@
cont->io.mutex.clear();
cont->io.aio_result = 0;
cont->io.aiocb.aio_nbytes = 0;
+ cont->io.aiocb.aio_reqprio = AIO_DEFAULT_PRIORITY;
#ifdef HTTP_CACHE
cont->request.reset();
cont->vector.clear();