You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by jp...@apache.org on 2010/01/05 22:39:26 UTC

svn commit: r896234 - in /incubator/trafficserver/traffic/branches/dev/iocore: aio/AIO.cc aio/I_AIO.h aio/P_AIO.h cache/Cache.cc cache/CacheDir.cc cache/CacheDisk.cc cache/CachePart.cc cache/CacheRead.cc cache/CacheWrite.cc cache/P_CacheInternal.h

Author: jplevyak
Date: Tue Jan  5 21:39:25 2010
New Revision: 896234

URL: http://svn.apache.org/viewvc?rev=896234&view=rev
Log:
TS-102: Cleanup AIO: the aio operation now takes a 'thread' which is either the
thread to be called back on (via schedule_imm) or one of two special values
AIO_CALLBACK_THREAD_AIO which indicates that the callback should occur on the
AIO thread (in the case of the default THREAD aio type) or AIO_CALLBACK_THREAD_ANY
which indicates that the callback should occur on some EThread.  Also removed
INKAIO ifdefs as that code is long dead.

Modified:
    incubator/trafficserver/traffic/branches/dev/iocore/aio/AIO.cc
    incubator/trafficserver/traffic/branches/dev/iocore/aio/I_AIO.h
    incubator/trafficserver/traffic/branches/dev/iocore/aio/P_AIO.h
    incubator/trafficserver/traffic/branches/dev/iocore/cache/Cache.cc
    incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheDir.cc
    incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheDisk.cc
    incubator/trafficserver/traffic/branches/dev/iocore/cache/CachePart.cc
    incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheRead.cc
    incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheWrite.cc
    incubator/trafficserver/traffic/branches/dev/iocore/cache/P_CacheInternal.h

Modified: incubator/trafficserver/traffic/branches/dev/iocore/aio/AIO.cc
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/branches/dev/iocore/aio/AIO.cc?rev=896234&r1=896233&r2=896234&view=diff
==============================================================================
--- incubator/trafficserver/traffic/branches/dev/iocore/aio/AIO.cc (original)
+++ incubator/trafficserver/traffic/branches/dev/iocore/aio/AIO.cc Tue Jan  5 21:39:25 2010
@@ -21,18 +21,13 @@
   limitations under the License.
  */
 
-/****************************************************************************
-
-  Async Disk IO operations.
-
-  
-  
- ****************************************************************************/
+/*
+ * Async Disk IO operations.
+ */
 
 #include "P_AIO.h"
 
 #define MAX_DISKS_POSSIBLE 100
-#define SLEEP_TIME 100
 
 // globals
 
@@ -46,7 +41,6 @@
 // Don't need to acquire this for searching the array
 static ink_mutex insert_mutex;
 RecInt cache_config_threads_per_disk = 12;
-static RecInt cache_config_aio_sleep_time = SLEEP_TIME;
 Continuation *aio_err_callbck = 0;
 
 // AIO Stats
@@ -55,209 +49,14 @@
 inku64 aio_num_write = 0;
 inku64 aio_bytes_written = 0;
 
-static void aio_move(AIO_Reqs * req);
-
-//////////////////////////////////////////////////////////////////////////
-///////////////                    INKIO                 /////////////////
-//////////////////////////////////////////////////////////////////////////
-
-#ifdef INKDISKAIO
-#undef AIO_MODE
-#define AIO_MODE                         AIO_MODE_THREAD
-#define AIO_PENDING_WATERMARK   400
-#define AIO_PENDING_WARNING     1000
-ink32 write_thread_index = 0;
-#define NOS_WRITE_THREADS	   10
-int nos_write_threads = NOS_WRITE_THREADS;
-
-#define MY_AIO_THREAD		   (get_dmid(this_ethread()))
-#define DISK_WRITE_THREAD	   ((this_ethread()->tt == DEDICATED))
-
-
-inline int
-my_aio_thread(void)
-{
-  ink_assert(!DISK_WRITE_THREAD);
-  int ret = write_thread_index % nos_write_threads;
-  ink_atomic_increment(&write_thread_index, 1);
-  ink_atomic_swap(&write_thread_index, write_thread_index % nos_write_threads);
-  return (this_ethread()->id * nos_write_threads + ret);
-}
-
-
-unsigned int last_pending_report = 0;
-int pending_report_interval = 10000000;
-int inkdiskio_watermark = AIO_PENDING_WATERMARK;
-volatile int aio_pending = 0;
-volatile int pending_size = 0;
-int aio_queued = 0;
-volatile int max_aio_queued = 0;
-
-
-#include "inkaio.h"
-
-
-#define DISK_PERIOD                 -HRTIME_MSECONDS(11)
-static void aio_insert(AIOCallback * op, AIO_Reqs * req);
-int
-disk_io_handler(kcall_t * ioep)
-{
-  AIOCallback *op = (AIOCallback *) ioep->cookie;
-
-  struct ink_aiocb_t *a = &op->aiocb;
-  ink_assert(op->action.mutex);
-  ink_assert(a->aio_nbytes > 0);
-  if (op->aiocb.aio_lio_opcode == LIO_READ || op->aiocb.aio_lio_opcode == LIO_WRITE) {
-    ink_atomic_increment(&pending_size, -op->aiocb.aio_nbytes);
-    AIO_Reqs *req = aio_reqs[MY_AIO_THREAD];
-    ink_atomic_increment(&req->pending, -1);
-  }
-  if (ioep->type == INKAIO_FLUSH) {
-    Warning("AIO ERROR \n");
-    op->aio_result = -1;
-  } else
-    op->aio_result = op->aiocb.aio_nbytes;      // aio_result is not used here!
-  op->link.prev = NULL;
-  op->link.next = NULL;
-  op->mutex = op->action.mutex;
-  EThread *thread = this_ethread();
-  if (a->aio_lio_opcode == LIO_READ) {
-    eventProcessor.schedule_imm(op);
-  } else {
-    MUTEX_TRY_LOCK(lock, op->action.mutex, thread);
-    if (lock)
-      op->action.continuation->handleEvent(AIO_EVENT_DONE, op);
-    else {
-      eventProcessor.schedule_imm(NEW(new AIOMissEvent(op->action.mutex, op)));
-    }
-  }
-  return 0;
-}
-
-
-class DiskWriteMonitor:public Continuation
-{
-public:
-  int my_id;
-  int monitor_main(int event, Event * e);
-    DiskWriteMonitor(ProxyMutex * m, int id):Continuation(m), my_id(id)
-  {
-    SET_HANDLER(&DiskWriteMonitor::monitor_main);
-  };
-};
-
-
-inline void
-drain_thread_queue(int thread_id)
-{
-  AIO_Reqs *req = aio_reqs[thread_id];
-  MUTEX_TRY_LOCK(lock, req->list_mutex, this_ethread());
-  if (!lock)
-    return;
-  bool run_once = false;
-  while (req->pending < inkdiskio_watermark && (req->http_aio_todo.tail)) {
-    AIOCallback *op1;
-    struct ink_aiocb_t *a1;
-    op1 = (AIOCallback *) req->http_aio_todo.pop();
-    if (!op1)
-      break;
-    a1 = &op1->aiocb;
-    if (op1->aiocb.aio_lio_opcode == LIO_READ) {
-      ink_atomic_increment(&pending_size, op1->aiocb.aio_nbytes);
-      if (inkaio_aioread(get_kcb(this_ethread()), (void *) op1, a1->aio_fildes,
-                         ((char *) a1->aio_buf), a1->aio_nbytes, a1->aio_offset)) {
-        // FIXME
-        ink_release_assert(false);
-      }
-    } else {
-      ink_atomic_increment(&pending_size, op1->aiocb.aio_nbytes);
-      if (inkaio_aiowrite(get_kcb(this_ethread()), (void *) op1, a1->aio_fildes,
-                          ((char *) a1->aio_buf), a1->aio_nbytes, a1->aio_offset)) {
-        // FIXME
-        ink_release_assert(false);
-      }
-    }
-    if (!run_once)
-      run_once = true;
-    ink_atomic_increment(&req->pending, 1);
-    ink_atomic_increment(&req->queued, -1);
-  }
-  if (run_once)
-    inkaio_submit(get_kcb(this_ethread()));
-}
-
-int
-DiskWriteMonitor::monitor_main(int event, Event * e)
-{
-  static struct pollfd pfd;
-  static int poll_ret;
-  static AIO_Reqs *req = aio_reqs[my_id];
-  EThread *t = this_ethread();
-  // create a queue for kcalls
-  if (get_dm(this_ethread()) != this) {
-    *((Continuation **) ETHREAD_GET_PTR(t, dm_offset)) = this;
-    *((int *) ETHREAD_GET_PTR(t, dmid_offset)) = my_id;
-  }
-  *((INKAIOCB **) ETHREAD_GET_PTR(t, kcb_offset)) = inkaio_create(0, disk_io_handler);
-  if (get_kcb(t) == 0) {
-    Error("inkaio_create : could not create");
-    exit(-1);
-  } else
-    printf("Created writed thread %d \n", my_id);
-
-  pfd.fd = inkaio_filno(get_kcb(this_ethread()));
-  pfd.events = POLLIN;
-  for (;;) {
-    poll_ret = poll(NULL, 0, 10);
-    {
-      MUTEX_TRY_LOCK(lock, req->list_mutex, this_ethread());
-      if (lock) {
-        if (!INK_ATOMICLIST_EMPTY(req->aio_temp_list)) {
-          aio_move(req);
-        }
-        inkaio_dispatch((get_kcb(e->ethread)));
-        drain_thread_queue(my_id);
-      }
-    }
-  }
-  return 0;
-}
+static void aio_move(AIO_Reqs *req);
 
-void
-initialize_thread_for_diskaio(EThread * thread)
-{
-  AIO_Reqs *request;
-  if (ts_config_with_inkdiskio) {
-    for (int i = 0; i < nos_write_threads; i++) {
-      request = (AIO_Reqs *) xmalloc(sizeof(AIO_Reqs));
-      memset(request, 0, sizeof(AIO_Reqs));
-
-      INK_WRITE_MEMORY_BARRIER;
-
-      // ink_cond_init(&request->aio_cond);
-      // ink_mutex_init(&request->aio_mutex, NULL);
-      request->list_mutex = new_ProxyMutex();
-      ink_atomiclist_init(&request->aio_temp_list, "temp_list", (unsigned) &((AIOCallback *) 0)->link);
-
-      request->index = 0;
-      request->filedes = -1;
-      aio_reqs[nos_write_threads * thread->id + i] = request;
-      DiskWriteMonitor *dm = new DiskWriteMonitor(new_ProxyMutex(),
-                                                  nos_write_threads * thread->id + i);
-      eventProcessor.spawn_thread(dm);
-    }
-  }
-}
-#endif
-
-
-
-////////////////////////////////////////////////////////////////////////
-////////        Stats            Stuff                            //////
-////////////////////////////////////////////////////////////////////////
+/*
+ * Stats
+ */
 
 static int
-aio_stats_cb(const char *name, RecDataT data_type, RecData * data, RecRawStatBlock * rsb, int id)
+aio_stats_cb(const char *name, RecDataT data_type, RecData *data, RecRawStatBlock *rsb, int id)
 {
   (void) data_type;
   (void) rsb;
@@ -272,7 +71,6 @@
   RecGetGlobalRawStatSum(aio_rsb, id, &sum);
   RecGetGlobalRawStatCount(aio_rsb, id, &count);
 
-
   ink64 time_diff = ink_hrtime_to_msec(now - count);
   if (time_diff == 0) {
     data->rec_float = 0.0;
@@ -282,18 +80,15 @@
   case AIO_STAT_READ_PER_SEC:
     new_val = aio_num_read;
     break;
-
   case AIO_STAT_WRITE_PER_SEC:
     new_val = aio_num_write;
     break;
-
   case AIO_STAT_KB_READ_PER_SEC:
     new_val = aio_bytes_read >> 10;
     break;
   case AIO_STAT_KB_WRITE_PER_SEC:
     new_val = aio_bytes_written >> 10;
     break;
-
   default:
     ink_assert(0);
   }
@@ -314,13 +109,10 @@
 int
 AIOTestData::ink_aio_stats(int event, void *d)
 {
-
   ink_hrtime now = ink_get_hrtime();
   double time_msec = (double) (now - start) / (double) HRTIME_MSECOND;
-  for (int i = 0; i < num_filedes; i++) {
-
+  for (int i = 0; i < num_filedes; i++)
     printf("%0.2f\t%i\t%i\t%i\n", time_msec, aio_reqs[i]->filedes, aio_reqs[i]->pending, aio_reqs[i]->queued);
-  }
   printf("Num Requests: %i Num Queued: %i num Moved: %i\n\n", data->num_req, data->num_queue, data->num_temp);
   eventProcessor.schedule_in(this, HRTIME_MSECONDS(50), ET_CALL);
   return EVENT_DONE;
@@ -328,9 +120,9 @@
 
 #endif // AIO_STATS
 
-////////////////////////////////////////////////////////////////////////
-////////        Common           Stuff                            //////
-////////////////////////////////////////////////////////////////////////
+/*
+ * Common
+ */
 AIOCallback *
 new_AIOCallback(void)
 {
@@ -338,7 +130,7 @@
 };
 
 void
-ink_aio_set_callback(Continuation * callback)
+ink_aio_set_callback(Continuation *callback)
 {
   aio_err_callbck = callback;
 }
@@ -360,45 +152,14 @@
                      "proxy.process.cache.KB_write_per_sec",
                      RECD_FLOAT, RECP_NULL, (int) AIO_STAT_KB_WRITE_PER_SEC, aio_stats_cb);
 
-#ifdef INKDISKAIO
-  kcb_offset = eventProcessor.allocate(sizeof(INKAIOCB *));
-  dm_offset = eventProcessor.allocate(sizeof(Continuation *));
-  ink_assert(dm_offset - kcb_offset >= sizeof(Continuation *));
-  dmid_offset = eventProcessor.allocate(sizeof(int));
-  ink_assert(dmid_offset - dm_offset >= sizeof(int));
-  IOCORE_RegisterConfigInteger(RECT_CONFIG, "proxy.config.net.enable_ink_disk_io", 0, RECU_RESTART_TS, RECC_NULL, NULL);
-
-  IOCORE_ReadConfigInteger(ts_config_with_inkdiskio, "proxy.config.net.enable_ink_disk_io");
-
-  IOCORE_RegisterConfigInteger(RECT_CONFIG, "proxy.config.net.ink_disk_io_watermark", 400, RECU_NULL, RECC_NULL, NULL);
-  IOCORE_ReadConfigInteger(inkdiskio_watermark, "proxy.config.net.ink_disk_io_watermark");
-
-  IOCORE_RegisterConfigInteger(RECT_CONFIG, "proxy.config.net.ink_aio_write_threads", 10, RECU_NULL, RECC_NULL, NULL);
-  IOCORE_ReadConfigInteger(nos_write_threads, "proxy.config.net.ink_aio_write_threads");
-
-  if (ts_config_with_inkdiskio) {
-    if (FILE * fp = fopen("/dev/inkaio", "rw")) {
-      fclose(fp);
-    } else {
-      Warning("Inkio for disk cannot be enabled: /dev/kcalls does not exist");
-      ts_config_with_inkdiskio = 0;
-    }
-  }
-#endif
   if (!ts_config_with_inkdiskio) {
     memset(&aio_reqs, 0, MAX_DISKS_POSSIBLE * sizeof(AIO_Reqs *));
     ink_mutex_init(&insert_mutex, NULL);
     IOCORE_RegisterConfigInteger(RECT_CONFIG, "proxy.config.cache.threads_per_disk", 4, RECU_NULL, RECC_NULL, NULL);
     IOCORE_ReadConfigInteger(cache_config_threads_per_disk, "proxy.config.cache.threads_per_disk");
-
-    IOCORE_RegisterConfigInteger(RECT_CONFIG, "proxy.config.cache.aio_sleep_time", 100, RECU_DYNAMIC, RECC_NULL, NULL);
-    IOCORE_ReadConfigInteger(cache_config_aio_sleep_time, "proxy.config.cache.aio_sleep_time");
   }
-
-
 }
 
-
 int
 ink_aio_start()
 {
@@ -410,10 +171,6 @@
 }
 
 
-////////////////////////////////////////////////////////////////////////
-////////        Unix             Stuff                            //////
-////////////////////////////////////////////////////////////////////////
-
 static void *aio_thread_main(void *arg);
 
 struct AIOThreadInfo:public Continuation
@@ -422,7 +179,7 @@
   AIO_Reqs *req;
   int sleep_wait;
 
-  int start(int event, Event * e)
+  int start(int event, Event *e)
   {
     (void) event;
     (void) e;
@@ -430,7 +187,7 @@
     return EVENT_DONE;
   }
 
-  AIOThreadInfo(AIO_Reqs * thr_req, int sleep):Continuation(new_ProxyMutex()), req(thr_req), sleep_wait(sleep)
+  AIOThreadInfo(AIO_Reqs *thr_req, int sleep):Continuation(new_ProxyMutex()), req(thr_req), sleep_wait(sleep)
   {
     SET_HANDLER(&AIOThreadInfo::start);
   }
@@ -489,12 +246,8 @@
 /* insert a request into either aio_todo or http_todo queue. aio_todo
    list is kept sorted */
 static void
-aio_insert(AIOCallback * op, AIO_Reqs * req)
+aio_insert(AIOCallback *op, AIO_Reqs *req)
 {
-#ifdef INKDISKAIO
-  op->aiocb.aio_reqprio = AIO_LOWEST_PRIORITY;
-#endif
-
 #ifdef AIO_STATS
   num_requests++;
   req->queued++;
@@ -519,15 +272,12 @@
 
     /* Either the queue was empty or this request has the highest priority */
     req->aio_todo.push(op);
-
   }
-  return;
-
 }
 
 /* move the request from the atomic list to the queue */
 static void
-aio_move(AIO_Reqs * req)
+aio_move(AIO_Reqs *req)
 {
   AIOCallback *next = NULL, *prev = NULL, *cb = (AIOCallback *) ink_atomiclist_popall(&req->aio_temp_list);
   /* flip the list */
@@ -551,7 +301,7 @@
 
 /* queue the new request */
 static void
-aio_queue_req(AIOCallbackInternal * op)
+aio_queue_req(AIOCallbackInternal *op)
 {
   int thread_ndx = 0;
   AIO_Reqs *req = op->aio_req;
@@ -560,22 +310,6 @@
 #ifdef AIO_STATS
   ink_atomic_increment((int *) &data->num_req, 1);
 #endif
-#ifdef INKDISKAIO
-  if (ts_config_with_inkdiskio) {
-    if (DISK_WRITE_THREAD) {
-      req = aio_reqs[MY_AIO_THREAD];
-      ink_assert(req);
-    } else {
-      req = aio_reqs[my_aio_thread()];
-      if (!req) {
-        initialize_thread_for_diskaio(this_ethread());
-        req = aio_reqs[my_aio_thread()];
-      }
-      ink_assert(req);
-    }
-    goto Lacquirelock;
-  }
-#endif
   if (!req || req->filedes != op->aiocb.aio_fildes) {
     /* search for the matching file descriptor */
     for (; thread_ndx < num_filedes; thread_ndx++) {
@@ -608,13 +342,7 @@
     }
     op->aio_req = req;
   }
-#ifndef INKDISKAIO
-  ink_assert(req->filedes == op->aiocb.aio_fildes);
-#endif
   ink_atomic_increment(&req->requests_queued, 1);
-#ifdef INKDISKAIO
-Lacquirelock:
-#endif
   if (!ink_mutex_try_acquire(&req->aio_mutex)) {
 #ifdef AIO_STATS
     ink_atomic_increment(&data->num_temp, 1);
@@ -635,18 +363,13 @@
 }
 
 static inline int
-cache_op(AIOCallbackInternal * op)
+cache_op(AIOCallbackInternal *op)
 {
   bool read = (op->aiocb.aio_lio_opcode == LIO_READ) ? 1 : 0;
   for (; op; op = (AIOCallbackInternal *) op->then) {
     ink_aiocb_t *a = &op->aiocb;
     int err, res = 0;
 
-#ifdef DEBUG
-    if (op->sleep_time) {
-      ink_sleep(op->sleep_time);
-    }
-#endif
     while (a->aio_nbytes - res > 0) {
       do {
         if (read)
@@ -655,10 +378,8 @@
           err = ink_pwrite(a->aio_fildes, ((char *) a->aio_buf) + res, a->aio_nbytes - res, a->aio_offset + res);
       } while ((err < 0) && (errno == EINTR || errno == ENOBUFS || errno == ENOMEM));
       if (err <= 0) {
-#ifdef DIAGS_MODULARIZED
         Warning("cache disk operation failed %s %d %d\n",
                 (a->aio_lio_opcode == LIO_READ) ? "READ" : "WRITE", err, errno);
-#endif
         op->aio_result = -errno;
         return (err);
       }
@@ -671,7 +392,7 @@
 }
 
 int
-ink_aio_read(AIOCallback * op)
+ink_aio_read(AIOCallback *op)
 {
   op->aiocb.aio_lio_opcode = LIO_READ;
   switch (AIO_MODE) {
@@ -690,24 +411,15 @@
     cache_op((AIOCallbackInternal *) op);
     op->action.continuation->handleEvent(AIO_EVENT_DONE, op);
     break;
-  case AIO_MODE_THREAD:{
-      aio_queue_req((AIOCallbackInternal *) op);
-      break;
-    }
-#if 0
-  case AIO_MODE_INK:
-    ink_disk_read(op->aiocb.aio_fildes, (char *) op->aiocb.aio_buf, op->aiocb.aio_nbytes, op->aiocb.aio_offset, op);
-    break;
-  default:
-    ASSERT(0);
+  case AIO_MODE_THREAD:
+    aio_queue_req((AIOCallbackInternal *) op);
     break;
-#endif
   }
   return 1;
 }
 
 int
-ink_aio_write(AIOCallback * op)
+ink_aio_write(AIOCallback *op)
 {
   op->aiocb.aio_lio_opcode = LIO_WRITE;
   switch (AIO_MODE) {
@@ -726,18 +438,9 @@
     cache_op((AIOCallbackInternal *) op);
     op->action.continuation->handleEvent(AIO_EVENT_DONE, op);
     break;
-  case AIO_MODE_THREAD:{
-      aio_queue_req((AIOCallbackInternal *) op);
-      break;
-    }
-#if 0
-  case IO_MODE_INK:
-    ink_disk_write(op->aiocb.aio_fildes, (char *) op->aiocb.aio_buf, op->aiocb.aio_nbytes, op->aiocb.aio_offset, op);
-    break;
-  default:
-    ASSERT(0);
+  case AIO_MODE_THREAD:
+    aio_queue_req((AIOCallbackInternal *) op);
     break;
-#endif
   }
   return 1;
 }
@@ -747,7 +450,6 @@
 {
   AIOThreadInfo *thr_info = (AIOThreadInfo *) arg;
   AIO_Reqs *my_aio_req = (AIO_Reqs *) thr_info->req;
-  int timed_wait = thr_info->sleep_wait;
   AIO_Reqs *current_req = NULL;
   AIOCallback *op = NULL;
   ink_mutex_acquire(&my_aio_req->aio_mutex);
@@ -787,37 +489,18 @@
 #endif
       op->link.prev = NULL;
       op->link.next = NULL;
-      // make op continuation share op->action's mutex
       op->mutex = op->action.mutex;
-      /* why do we callback on AIO thread only if its a write??
-         See INKqa07855. The problem is that with a lot of users,
-         the Net threads take a lot of time (as high as a second)
-         to come back to ink_aio_complete. This means that the 
-         partition can only issue 1 i/o per second. To 
-         get around this problem, we have the aio threads callback
-         the partitions directly. The partition issues another i/o
-         on the same thread (assuming there is enough stuff to be 
-         written). The partition is careful not to callback the VC's 
-         and not schedule any events on the thread.
-         It does not matter for reads because its generally the 
-         CacheVC's that issue reads and they have to do a fair
-         bit of computation (go through the docheader, call http
-         state machine back, etc) before they can issue another
-         read.
-       */
-      if (op->aiocb.aio_lio_opcode == LIO_WRITE) {
+      if (op->thread == AIO_CALLBACK_THREAD_AIO) {
         MUTEX_LOCK(lock, op->mutex, thr_info->mutex->thread_holding);
         if (!op->action.cancelled)
           op->action.continuation->handleEvent(AIO_EVENT_DONE, op);
-      } else
+      } else if (op->thread == AIO_CALLBACK_THREAD_ANY)
+        eventProcessor.schedule_imm(op);
+      else
         op->thread->schedule_imm(op);
       ink_mutex_acquire(&my_aio_req->aio_mutex);
     } while (1);
-    if (timed_wait) {
-      timespec ts = ink_based_hrtime_to_timespec(ink_get_hrtime() + HRTIME_MSECONDS(cache_config_aio_sleep_time));
-      ink_cond_timedwait(&my_aio_req->aio_cond, &my_aio_req->aio_mutex, &ts);
-    } else
-      ink_cond_wait(&my_aio_req->aio_cond, &my_aio_req->aio_mutex);
+    ink_cond_wait(&my_aio_req->aio_cond, &my_aio_req->aio_mutex);
   }
   return 0;
 }

Modified: incubator/trafficserver/traffic/branches/dev/iocore/aio/I_AIO.h
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/branches/dev/iocore/aio/I_AIO.h?rev=896234&r1=896233&r2=896234&view=diff
==============================================================================
--- incubator/trafficserver/traffic/branches/dev/iocore/aio/I_AIO.h (original)
+++ incubator/trafficserver/traffic/branches/dev/iocore/aio/I_AIO.h Tue Jan  5 21:39:25 2010
@@ -50,11 +50,14 @@
 #define AIO_MODE_AIO             0
 #define AIO_MODE_SYNC            1
 #define AIO_MODE_THREAD          2
-#define AIO_MODE_INK             3
 #define AIO_MODE                 AIO_MODE_THREAD
 
+// AIOCallback::thread special values
+#define AIO_CALLBACK_THREAD_ANY ((EThread*)0) // any regular event thread
+#define AIO_CALLBACK_THREAD_AIO ((EThread*)-1)
+
 #define AIO_LOWEST_PRIORITY      0
-#define AIO_DEFAULT_PRIORITY     AIO_LOWEST_PRIORTY
+#define AIO_DEFAULT_PRIORITY     AIO_LOWEST_PRIORITY
 
 struct AIOCallback:Continuation
 {
@@ -67,8 +70,9 @@
   int aio_result;
 
   int ok();
-  // AIOCallback();
-  virtual void AIOCallback_is_an_abstract_class() = 0;
+  AIOCallback() : thread(AIO_CALLBACK_THREAD_ANY), then(0) {
+    aiocb.aio_reqprio = AIO_DEFAULT_PRIORITY;
+  }
 };
 
 void ink_aio_init(ModuleVersion version);

Modified: incubator/trafficserver/traffic/branches/dev/iocore/aio/P_AIO.h
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/branches/dev/iocore/aio/P_AIO.h?rev=896234&r1=896233&r2=896234&view=diff
==============================================================================
--- incubator/trafficserver/traffic/branches/dev/iocore/aio/P_AIO.h (original)
+++ incubator/trafficserver/traffic/branches/dev/iocore/aio/P_AIO.h Tue Jan  5 21:39:25 2010
@@ -39,7 +39,8 @@
 #include "P_EventSystem.h"
 #include "I_AIO.h"
 
-// #define AIO_STATS
+// for debugging
+// #define AIO_STATS 1
 
 #undef  AIO_MODULE_VERSION
 #define AIO_MODULE_VERSION        makeModuleVersion(AIO_MODULE_MAJOR_VERSION,\
@@ -53,9 +54,6 @@
   AIO_Reqs *aio_req;
   ink_hrtime sleep_time;
   int io_complete(int event, void *data);
-  void AIOCallback_is_an_abstract_class()
-  {
-  }
   AIOCallbackInternal()
   {
     const size_t to_zero = sizeof(AIOCallbackInternal)
@@ -90,9 +88,6 @@
   /* Atomic list to temporarily hold the request if the
      lock for a particular queue cannot be acquired */
   InkAtomicList aio_temp_list;
-#ifdef INKDISKAIO
-  ProxyMutexPtr list_mutex;
-#endif
   ink_mutex aio_mutex;
   ink_cond aio_cond;
   int index;                    /* position of this struct in the aio_reqs array */
@@ -110,55 +105,15 @@
   int num_temp;
   int num_queue;
   ink_hrtime start;
-    AIOTestData():Continuation(new_ProxyMutex()), num_req(0), num_temp(0), num_queue(0)
-  {
-    start = ink_get_hrtime();
-    SET_HANDLER(&AIOTestData::ink_aio_stats);
-  }
 
   int ink_aio_stats(int event, void *data);
-};
-#endif
 
-
-#ifdef INKDISKAIO
-#include "inkaio.h"
-void initialize_thread_for_diskaio(EThread *);
-struct AIOMissEvent:Continuation
-{
-  AIOCallback *cb;
-
-  int mainEvent(int event, Event * e)
+  AIOTestData():Continuation(new_ProxyMutex()), num_req(0), num_temp(0), num_queue(0)
   {
-    if (!cb->action.cancelled)
-      cb->action.continuation->handleEvent(AIO_EVENT_DONE, cb);
-    delete this;
-      return EVENT_DONE;
-  }
-
-  AIOMissEvent(ProxyMutex * amutex, AIOCallback * acb)
-  : Continuation(amutex), cb(acb)
-  {
-    SET_HANDLER(&AIOMissEvent::mainEvent);
+    start = ink_get_hrtime();
+    SET_HANDLER(&AIOTestData::ink_aio_stats);
   }
 };
-static ink_off_t kcb_offset, dm_offset, dmid_offset;
-inline INKAIOCB *
-get_kcb(EThread * t)
-{
-  return *((INKAIOCB **) ETHREAD_GET_PTR(t, kcb_offset));
-}
-
-inline Continuation *
-get_dm(EThread * t)
-{
-  return *((Continuation **) ETHREAD_GET_PTR(t, dm_offset));
-}
-inline int
-get_dmid(EThread * t)
-{
-  return *((int *) ETHREAD_GET_PTR(t, dmid_offset));
-}
 #endif
 
 enum aio_stat_enum

Modified: incubator/trafficserver/traffic/branches/dev/iocore/cache/Cache.cc
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/branches/dev/iocore/cache/Cache.cc?rev=896234&r1=896233&r2=896234&view=diff
==============================================================================
--- incubator/trafficserver/traffic/branches/dev/iocore/cache/Cache.cc (original)
+++ incubator/trafficserver/traffic/branches/dev/iocore/cache/Cache.cc Tue Jan  5 21:39:25 2010
@@ -955,12 +955,11 @@
   SET_HANDLER(&Part::handle_dir_clear);
 
   io.aiocb.aio_fildes = fd;
-  io.aiocb.aio_reqprio = 0;
   io.aiocb.aio_buf = raw_dir;
   io.aiocb.aio_nbytes = dir_len;
   io.aiocb.aio_offset = skip;
   io.action = this;
-  io.thread = this_ethread();
+  io.thread = AIO_CALLBACK_THREAD_ANY;
   io.then = 0;
   ink_assert(ink_aio_write(&io));
   return 0;
@@ -1038,7 +1037,6 @@
   for (i = 0; i < 4; i++) {
     AIOCallback *aio = &(init_info->part_aio[i]);
     aio->aiocb.aio_fildes = fd;
-    aio->aiocb.aio_reqprio = 0;
     aio->aiocb.aio_buf = &(init_info->part_h_f[i * INK_BLOCK_SIZE]);
     aio->aiocb.aio_nbytes = footerlen;
     aio->action = this;
@@ -1365,9 +1363,8 @@
     for (int i = 0; i < 3; i++) {
       AIOCallback *aio = &(init_info->part_aio[i]);
       aio->aiocb.aio_fildes = fd;
-      aio->aiocb.aio_reqprio = 0;
       aio->action = this;
-      aio->thread = this_ethread();
+      aio->thread = AIO_CALLBACK_THREAD_ANY;
       aio->then = (i < 2) ? &(init_info->part_aio[i + 1]) : 0;
     }
     int footerlen = ROUND_TO_BLOCK(sizeof(PartHeaderFooter));
@@ -1438,11 +1435,10 @@
     }
 
     io.aiocb.aio_fildes = fd;
-    io.aiocb.aio_reqprio = 0;
     io.aiocb.aio_nbytes = part_dirlen(this);
     io.aiocb.aio_buf = raw_dir;
     io.action = this;
-    io.thread = this_ethread();
+    io.thread = AIO_CALLBACK_THREAD_ANY;
     io.then = 0;
 
     if (hf[0]->sync_serial == hf[1]->sync_serial &&

Modified: incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheDir.cc
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheDir.cc?rev=896234&r1=896233&r2=896234&view=diff
==============================================================================
--- incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheDir.cc (original)
+++ incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheDir.cc Tue Jan  5 21:39:25 2010
@@ -887,7 +887,7 @@
   io.aiocb.aio_nbytes = n;
   io.aiocb.aio_buf = b;
   io.action = this;
-  io.thread = mutex->thread_holding;
+  io.thread = AIO_CALLBACK_THREAD_ANY;
   ink_assert(ink_aio_write(&io) >= 0);
 }
 

Modified: incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheDisk.cc
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheDisk.cc?rev=896234&r1=896233&r2=896234&view=diff
==============================================================================
--- incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheDisk.cc (original)
+++ incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheDisk.cc Tue Jan  5 21:39:25 2010
@@ -67,20 +67,17 @@
     return clearDisk();
   }
 
-
   SET_HANDLER(&CacheDisk::openStart);
   io.aiocb.aio_offset = skip;
   io.aiocb.aio_buf = (char *) header;
   io.aiocb.aio_nbytes = header_len;
-
-  io.thread = this_ethread();
+  io.thread = AIO_CALLBACK_THREAD_ANY;
   ink_aio_read(&io);
   return 0;
 }
 
 CacheDisk::~CacheDisk()
 {
-
   if (path) {
     xfree(path);
     for (int i = 0; i < (int) header->num_partitions; i++) {
@@ -104,13 +101,12 @@
 int
 CacheDisk::clearDisk()
 {
-
   delete_all_partitions();
 
   io.aiocb.aio_offset = skip;
   io.aiocb.aio_buf = header;
   io.aiocb.aio_nbytes = header_len;
-  io.thread = this_ethread();
+  io.thread = AIO_CALLBACK_THREAD_ANY;
   ink_aio_write(&io);
   return 0;
 }
@@ -157,7 +153,6 @@
 
   SET_HANDLER(&CacheDisk::openDone);
   return openDone(EVENT_IMMEDIATE, 0);
-
 }
 
 int
@@ -178,11 +173,10 @@
 int
 CacheDisk::sync()
 {
-
   io.aiocb.aio_offset = skip;
   io.aiocb.aio_buf = header;
   io.aiocb.aio_nbytes = header_len;
-  io.thread = this_ethread();
+  io.thread = AIO_CALLBACK_THREAD_ANY;
   ink_aio_write(&io);
   return 0;
 }
@@ -207,7 +201,6 @@
 DiskPartBlock *
 CacheDisk::create_partition(int number, ink_off_t size_in_blocks, int scheme)
 {
-
   if (size_in_blocks == 0)
     return NULL;
 
@@ -233,7 +226,6 @@
     }
   }
 
-
   if (!p && !closest_match)
     return NULL;
 
@@ -267,7 +259,6 @@
     free_blocks->size += dpb->len;
     free_space += dpb->len;
     header->num_diskpart_blks++;
-
   } else
     header->num_free--;
 
@@ -296,7 +287,6 @@
     disk_parts[i]->size = q->b->len;
     header->num_partitions++;
   }
-
   return p;
 }
 
@@ -304,7 +294,6 @@
 int
 CacheDisk::delete_partition(int number)
 {
-
   unsigned int i;
   for (i = 0; i < header->num_partitions; i++) {
     if (disk_parts[i]->part_number == number) {
@@ -340,7 +329,6 @@
 void
 CacheDisk::update_header()
 {
-
   unsigned int n = 0;
   unsigned int i, j;
   if (free_blocks) {
@@ -397,14 +385,12 @@
     }
   }
 
-
   ink_assert(n == header->num_partitions);
 }
 
 DiskPart *
 CacheDisk::get_diskpart(int part_number)
 {
-
   unsigned int i;
   for (i = 0; i < header->num_partitions; i++) {
     if (disk_parts[i]->part_number == part_number) {
@@ -417,7 +403,6 @@
 int
 CacheDisk::delete_all_partitions()
 {
-
   header->part_info[0].offset = start;
   header->part_info[0].len = num_usable_blocks;
   header->part_info[0].type = CACHE_NONE_TYPE;

Modified: incubator/trafficserver/traffic/branches/dev/iocore/cache/CachePart.cc
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/branches/dev/iocore/cache/CachePart.cc?rev=896234&r1=896233&r2=896234&view=diff
==============================================================================
--- incubator/trafficserver/traffic/branches/dev/iocore/cache/CachePart.cc (original)
+++ incubator/trafficserver/traffic/branches/dev/iocore/cache/CachePart.cc Tue Jan  5 21:39:25 2010
@@ -125,7 +125,7 @@
     io.aiocb.aio_nbytes = SCAN_BUF_SIZE;
     io.aiocb.aio_buf = buf->data();
     io.action = this;
-    io.thread = mutex->thread_holding;
+    io.thread = AIO_CALLBACK_THREAD_ANY;
     goto Lread;
   }
 

Modified: incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheRead.cc
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheRead.cc?rev=896234&r1=896233&r2=896234&view=diff
==============================================================================
--- incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheRead.cc (original)
+++ incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheRead.cc Tue Jan  5 21:39:25 2010
@@ -297,13 +297,11 @@
   CACHE_TRY_LOCK(lock, part->mutex, mutex->thread_holding);
   if (!lock)
     VC_SCHED_LOCK_RETRY();
+  od = part->open_read(&first_key); // recheck in case the lock failed
   if (!od) {
-    od = part->open_read(&first_key);
-    if (!od) {
-      write_vc = NULL;
-      SET_HANDLER(&CacheVC::openReadStartHead);
-      return openReadStartHead(event, e);
-    }
+    write_vc = NULL;
+    SET_HANDLER(&CacheVC::openReadStartHead);
+    return openReadStartHead(event, e);
   } else
     ink_debug_assert(od == part->open_read(&first_key));
   if (!write_vc) {
@@ -419,8 +417,8 @@
   writer_buf = write_vc->blocks;
   writer_offset = write_vc->offset;
   length = write_vc->length;
-  //copy the vector
-  f.single_fragment = !write_vc->fragment;        //single fragment doc
+  // copy the vector
+  f.single_fragment = !write_vc->fragment;        // single fragment doc
   docpos = 0;
   earliest_key = write_vc->earliest_key;
   ink_assert(earliest_key == key);
@@ -428,14 +426,11 @@
   dir_clean(&first_dir);
   dir_clean(&earliest_dir);
   Debug("cache_read_agg", "%x key: %X %X: single fragment read", first_key.word(1), key.word(0));
-
   MUTEX_RELEASE(writer_lock);
-  //we've got everything....ready to roll!!
   SET_HANDLER(&CacheVC::openReadFromWriterMain);
   CACHE_INCREMENT_DYN_STAT(cache_read_busy_success_stat);
   _action.continuation->handleEvent(CACHE_EVENT_OPEN_READ, (void *) this);
   return EVENT_DONE;
-
 #endif //READ_WHILE_WRITER
 }
 

Modified: incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheWrite.cc
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheWrite.cc?rev=896234&r1=896233&r2=896234&view=diff
==============================================================================
--- incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheWrite.cc (original)
+++ incubator/trafficserver/traffic/branches/dev/iocore/cache/CacheWrite.cc Tue Jan  5 21:39:25 2010
@@ -729,7 +729,7 @@
 
       io.aiocb.aio_buf = doc_evacuator->buf->data();
       io.action = this;
-      io.thread = mutex->thread_holding;
+      io.thread = AIO_CALLBACK_THREAD_ANY;
       Debug("cache_evac", "evac_range evacuating %X %d", (int) dir_tag(&first->dir), (int) dir_offset(&first->dir));
       SET_HANDLER(&Part::evacuateDocReadDone);
       ink_assert(ink_aio_read(&io) >= 0);
@@ -1040,7 +1040,12 @@
   io.aiocb.aio_buf = agg_buffer;
   io.aiocb.aio_nbytes = agg_buf_pos;
   io.action = this;
-  io.thread = mutex->thread_holding;
+  /*
+    Callback on AIO thread so that we can issue a new write ASAP
+    as all writes are serialized in the partition.  This is not necessary
+    for reads proceed independently.
+   */
+  io.thread = AIO_CALLBACK_THREAD_AIO;
   SET_HANDLER(&Part::aggWriteDone);
   ink_aio_write(&io);
 

Modified: incubator/trafficserver/traffic/branches/dev/iocore/cache/P_CacheInternal.h
URL: http://svn.apache.org/viewvc/incubator/trafficserver/traffic/branches/dev/iocore/cache/P_CacheInternal.h?rev=896234&r1=896233&r2=896234&view=diff
==============================================================================
--- incubator/trafficserver/traffic/branches/dev/iocore/cache/P_CacheInternal.h (original)
+++ incubator/trafficserver/traffic/branches/dev/iocore/cache/P_CacheInternal.h Tue Jan  5 21:39:25 2010
@@ -528,6 +528,7 @@
   cont->io.mutex.clear();
   cont->io.aio_result = 0;
   cont->io.aiocb.aio_nbytes = 0;
+  cont->io.aiocb.aio_reqprio = AIO_DEFAULT_PRIORITY;
 #ifdef HTTP_CACHE
   cont->request.reset();
   cont->vector.clear();