You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by zy...@apache.org on 2013/04/03 09:35:29 UTC

git commit: TS-1760: Option to use Linux native AIO

Updated Branches:
  refs/heads/master 84df57e0e -> c95298dfc


TS-1760: Option to use Linux native AIO

to enable the Linux Native AIO, be sure to check Linux kernel AIO
supporting and use '--use_linux_native_aio' configure directive.

in the Linux Native AIO, all the IO is managed by system, so
proxy.config.cache.threads_per_disk have no meaning anymore.


Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo
Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/c95298df
Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/c95298df
Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/c95298df

Branch: refs/heads/master
Commit: c95298dfcc6336cc04f172713f5d41e023d19d51
Parents: 84df57e
Author: weijin <ta...@taobao.com>
Authored: Wed Apr 3 14:59:48 2013 +0800
Committer: Zhao Yongming <mi...@gmail.com>
Committed: Wed Apr 3 15:34:53 2013 +0800

----------------------------------------------------------------------
 configure.ac             |   19 +++++
 iocore/aio/AIO.cc        |  146 +++++++++++++++++++++++++++++++++++++++--
 iocore/aio/I_AIO.h       |  129 ++++++++++++++++++++++++++++++++++++-
 iocore/aio/P_AIO.h       |   59 +++++++++++++++--
 iocore/cache/Cache.cc    |   74 ++++++++++++++++++++-
 iocore/cache/CacheVol.cc |    2 +-
 lib/ts/ink_aiocb.h       |    4 +-
 lib/ts/ink_config.h.in   |    1 +
 proxy/InkAPI.cc          |    4 +
 9 files changed, 421 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c95298df/configure.ac
----------------------------------------------------------------------
diff --git a/configure.ac b/configure.ac
index 7b30f26..72e60b0 100644
--- a/configure.ac
+++ b/configure.ac
@@ -398,6 +398,25 @@ TS_ARG_ENABLE_VAR([use], [reclaimable_freelist])
 AC_SUBST(use_reclaimable_freelist)
 
 #
+# If the OS is linux, we can use '--use_linux_native_aio' option to
+# replace the aio thread mode. Effective only on the linux system.
+#
+
+if test "x${host_os_def}" = "xlinux"; then
+  AC_MSG_CHECKING([whether to use native aio or not])
+  AC_ARG_ENABLE([linux_native_aio],
+    [AS_HELP_STRING([--enable-linux-native-aio],
+      [turn on linux native aio, only effective on linux system])],
+    [],
+    [enable_linux_native_aio="yes"])
+  AC_MSG_RESULT([$enable_linux_native_aio])
+else
+  enable_linux_native_aio="no"
+fi
+TS_ARG_ENABLE_VAR([use], [linux_native_aio])
+AC_SUBST(use_linux_native_aio)
+
+
 # Configure how many stats to allocate for plugins. Default is 512.
 #
 AC_ARG_WITH([max-api-stats],

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c95298df/iocore/aio/AIO.cc
----------------------------------------------------------------------
diff --git a/iocore/aio/AIO.cc b/iocore/aio/AIO.cc
index 3086f0c..40bf1b1 100644
--- a/iocore/aio/AIO.cc
+++ b/iocore/aio/AIO.cc
@@ -27,6 +27,10 @@
 
 #include "P_AIO.h"
 
+#if AIO_MODE == AIO_MODE_NATIVE
+#define AIO_PERIOD                                -HRTIME_MSECONDS(4)
+#else
+
 #define MAX_DISKS_POSSIBLE 100
 
 // globals
@@ -36,24 +40,24 @@ int ts_config_with_inkdiskio = 0;
 AIO_Reqs *aio_reqs[MAX_DISKS_POSSIBLE];
 /* number of unique file descriptors in the aio_reqs array */
 volatile int num_filedes = 1;
-RecRawStatBlock *aio_rsb = NULL;
+
 // acquire this mutex before inserting a new entry in the aio_reqs array.
 // Don't need to acquire this for searching the array
 static ink_mutex insert_mutex;
-Continuation *aio_err_callbck = 0;
+
 RecInt cache_config_threads_per_disk = 12;
 RecInt api_config_threads_per_disk = 12;
 int thread_is_created = 0;
+#endif // AIO_MODE == AIO_MODE_NATIVE
 
-
+RecRawStatBlock *aio_rsb = NULL;
+Continuation *aio_err_callbck = 0;
 // AIO Stats
 uint64_t aio_num_read = 0;
 uint64_t aio_bytes_read = 0;
 uint64_t aio_num_write = 0;
 uint64_t aio_bytes_written = 0;
 
-static void aio_move(AIO_Reqs *req);
-
 /*
  * Stats
  */
@@ -156,10 +160,12 @@ ink_aio_init(ModuleVersion v)
   RecRegisterRawStat(aio_rsb, RECT_PROCESS,
                      "proxy.process.cache.KB_write_per_sec",
                      RECD_FLOAT, RECP_NULL, (int) AIO_STAT_KB_WRITE_PER_SEC, aio_stats_cb);
+#if AIO_MODE != AIO_MODE_NATIVE
   memset(&aio_reqs, 0, MAX_DISKS_POSSIBLE * sizeof(AIO_Reqs *));
   ink_mutex_init(&insert_mutex, NULL);
 
   REC_ReadConfigInteger(cache_config_threads_per_disk, "proxy.config.cache.threads_per_disk");
+#endif
 }
 
 int
@@ -172,6 +178,7 @@ ink_aio_start()
   return 0;
 }
 
+#if  AIO_MODE != AIO_MODE_NATIVE
 
 static void *aio_thread_main(void *arg);
 
@@ -534,3 +541,132 @@ aio_thread_main(void *arg)
   }
   return 0;
 }
+#else
+int
+DiskHandler::startAIOEvent(int event, Event *e) {
+  SET_HANDLER(&DiskHandler::mainAIOEvent);
+  e->schedule_every(AIO_PERIOD);
+  trigger_event = e;
+  return EVENT_CONT;
+}
+
+int
+DiskHandler::mainAIOEvent(int event, Event *e) {
+  AIOCallback *op = NULL;
+Lagain:
+  int ret = io_getevents(ctx, 0, MAX_AIO_EVENTS, events, NULL);
+  //printf("%d\n", ret);
+  for (int i = 0; i < ret; i++) {
+    op = (AIOCallback *) events[i].data;
+    op->aio_result = events[i].res;
+    ink_assert(op->action.continuation);
+    complete_list.enqueue(op);
+    //op->handleEvent(event, e);
+  }
+  if (ret == MAX_AIO_EVENTS)
+    goto Lagain;
+  if (ret < 0)
+    perror("io_getevents");
+
+  ink_aiocb_t *cbs[MAX_AIO_EVENTS];
+  int num = 0;
+  for (; num < MAX_AIO_EVENTS && ((op = ready_list.dequeue()) != NULL); ++num) {
+    cbs[num] = &op->aiocb;
+    ink_debug_assert(op->action.continuation);
+  }
+  if (num > 0) {
+    int ret;
+    do {
+      ret = io_submit(ctx, num, cbs);
+    } while (ret < 0 && errno == EAGAIN);
+
+    if (ret != num) {
+      if (ret < 0)
+        perror("io_submit error");
+      else {
+        fprintf(stderr, "could not sumbit IOs");
+        ink_debug_assert(0);
+      }
+    }
+  }
+
+  while ((op = complete_list.dequeue()) != NULL) {
+    op->handleEvent(event, e);
+  }
+  return EVENT_CONT;
+}
+
+int
+ink_aio_read(AIOCallback *op, int fromAPI) {
+  op->aiocb.aio_reqprio = AIO_DEFAULT_PRIORITY;
+  op->aiocb.aio_lio_opcode = IOCB_CMD_PREAD;
+  op->aiocb.aio_data = op;
+  this_ethread()->diskHandler->ready_list.enqueue(op);
+
+  return 1;
+}
+
+int
+ink_aio_write(AIOCallback *op, int fromAPI) {
+  op->aiocb.aio_reqprio = AIO_DEFAULT_PRIORITY;
+  op->aiocb.aio_lio_opcode = IOCB_CMD_PWRITE;
+  op->aiocb.aio_data = op;
+  this_ethread()->diskHandler->ready_list.enqueue(op);
+
+  return 1;
+}
+
+int
+ink_aio_readv(AIOCallback *op, int fromAPI) {
+  DiskHandler *dh = this_ethread()->diskHandler;
+  AIOCallback *io = op;
+  int sz = 0;
+
+  while (io) {
+    io->aiocb.aio_reqprio = AIO_DEFAULT_PRIORITY;
+    io->aiocb.aio_lio_opcode = IOCB_CMD_PREAD;
+    io->aiocb.aio_data = io;
+    dh->ready_list.enqueue(io);
+    ++sz;
+    io = io->then;
+  }
+
+  if (sz > 1) {
+    ink_debug_assert(op->action.continuation);
+    AIOVec *vec = new AIOVec(sz, op->action.continuation);
+    vec->action = op->action.continuation;
+    while (--sz >= 0) {
+      op->action = vec;
+      op = op->then;
+    }
+  }
+  return 1;
+}
+
+int
+ink_aio_writev(AIOCallback *op, int fromAPI) {
+  DiskHandler *dh = this_ethread()->diskHandler;
+  AIOCallback *io = op;
+  int sz = 0;
+
+  while (io) {
+    io->aiocb.aio_reqprio = AIO_DEFAULT_PRIORITY;
+    io->aiocb.aio_lio_opcode = IOCB_CMD_PWRITE;
+    io->aiocb.aio_data = io;
+    dh->ready_list.enqueue(io);
+    ++sz;
+    io = io->then;
+  }
+
+  if (sz > 1) {
+    ink_debug_assert(op->action.continuation);
+    AIOVec *vec = new AIOVec(sz, op->action.continuation);
+    vec->action = op->action.continuation;
+    while (--sz >= 0) {
+      op->action = vec;
+      op = op->then;
+    }
+  }
+  return 1;
+}
+#endif // AIO_MODE != AIO_MODE_NATIVE

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c95298df/iocore/aio/I_AIO.h
----------------------------------------------------------------------
diff --git a/iocore/aio/I_AIO.h b/iocore/aio/I_AIO.h
index 8d4cd2b..ccdc078 100644
--- a/iocore/aio/I_AIO.h
+++ b/iocore/aio/I_AIO.h
@@ -46,8 +46,114 @@
 #define AIO_MODE_AIO             0
 #define AIO_MODE_SYNC            1
 #define AIO_MODE_THREAD          2
+#define AIO_MODE_NATIVE          3
+#if use_linux_native_aio
+#define AIO_MODE                 AIO_MODE_NATIVE
+#else
 #define AIO_MODE                 AIO_MODE_THREAD
+#endif
+
+#if AIO_MODE == AIO_MODE_NATIVE
+
+#include <sys/syscall.h>  /* for __NR_* definitions */
+#include <linux/aio_abi.h>  /* for AIO types and constants */
+#define MAX_AIO_EVENTS 1024
+
+#if defined(__LITTLE_ENDIAN)
+#if (SIZEOF_VOID_POINTER == 4)
+#define PADDEDPtr(x, y) x; unsigned y
+#define PADDEDul(x, y) unsigned long x; unsigned y
+#elif (SIZEOF_VOID_POINTER == 8)
+#define PADDEDPtr(x, y) x
+#define PADDEDul(x, y) unsigned long x
+#endif
+#elif defined(__BIG_ENDIAN)
+#if (SIZEOF_VOID_POINTER == 4)
+#define PADDEDPtr(x, y) unsigned y; x
+#define PADDEDul(x, y) unsigned y; unsigned long y
+#elif (SIZEOF_VOID_POINTER == 8)
+#define PADDEDPtr(x, y) x
+#define PADDEDul(x, y) unsigned long x
+#endif
+#else
+#error edit for your odd byteorder.
+#endif
+
+typedef struct ink_iocb {
+  /* these are internal to the kernel/libc. */
+  PADDEDPtr(void *aio_data, _pad1); /* data to be returned in event's data */
+  unsigned PADDED(aio_key, aio_reserved1);
+        /* the kernel sets aio_key to the req # */
+
+  /* common fields */
+  short aio_lio_opcode; /* see IOCB_CMD_ above */
+  short aio_reqprio;
+  int aio_fildes;
+
+  PADDEDPtr(void *aio_buf, _pad2);
+  PADDEDul(aio_nbytes, _pad3);
+  int64_t aio_offset;
+
+  /* extra parameters */
+  uint64_t aio_reserved2;  /* TODO: use this for a (struct sigevent *) */
+
+  /* flags for the "struct iocb" */
+  int aio_flags;
+
+  /*
+   * if the IOCB_FLAG_RESFD flag of "aio_flags" is set, this is an
+   * eventfd to signal AIO readiness to
+   */
+  int aio_resfd;
+
+} ink_aiocb_t;
+
+typedef struct ink_io_event {
+  PADDEDPtr(void *data, _pad1);   /* the data field from the iocb */
+  PADDEDPtr(ink_aiocb_t *obj, _pad2);    /* what iocb this event came from */
+  PADDEDul(res, _pad3);    /* result code for this event */
+  PADDEDul(res2, _pad4);   /* secondary result */
+} ink_io_event_t;
+
+TS_INLINE int io_setup(unsigned nr, aio_context_t *ctxp)
+{
+  return syscall(__NR_io_setup, nr, ctxp);
+}
+
+TS_INLINE int io_destroy(aio_context_t ctx)
+{
+  return syscall(__NR_io_destroy, ctx);
+}
+
+TS_INLINE int io_submit(aio_context_t ctx, long nr,  ink_aiocb_t **iocbpp)
+{
+  return syscall(__NR_io_submit, ctx, nr, iocbpp);
+}
+
+TS_INLINE int io_getevents(aio_context_t ctx, long min_nr, long max_nr,
+    ink_io_event_t *events, struct timespec *timeout)
+{
+  return syscall(__NR_io_getevents, ctx, min_nr, max_nr, events, timeout);
+}
+
+struct AIOVec: public Continuation
+{
+  Action action;
+  int size;
+  int completed;
+
+  AIOVec(int sz, Continuation *c): Continuation(new_ProxyMutex()), size(sz), completed(0)
+  {
+    action = c;
+    SET_HANDLER(&AIOVec::mainEvent);
+  }
 
+  int mainEvent(int event, Event *e);
+};
+#else
+typedef ink_aiocb ink_aiocb_t;
+bool ink_aio_thread_num_set(int thread_num);
+#endif
 // AIOCallback::thread special values
 #define AIO_CALLBACK_THREAD_ANY ((EThread*)0) // any regular event thread
 #define AIO_CALLBACK_THREAD_AIO ((EThread*)-1)
@@ -71,12 +177,33 @@ struct AIOCallback: public Continuation
   }
 };
 
+#if AIO_MODE == AIO_MODE_NATIVE
+struct DiskHandler: public Continuation
+{
+  Event *trigger_event;
+  aio_context_t ctx;
+  ink_io_event_t events[MAX_AIO_EVENTS];
+  Que(AIOCallback, link) ready_list;
+  Que(AIOCallback, link) complete_list;
+  int startAIOEvent(int event, Event *e);
+  int mainAIOEvent(int event, Event *e);
+  DiskHandler() {
+    SET_HANDLER(&DiskHandler::startAIOEvent);
+    memset(&ctx, 0, sizeof(aio_context_t));
+    int ret = io_setup(MAX_AIO_EVENTS, &ctx);
+    if (ret < 0) {
+      perror("io_setup error");
+    }
+  }
+};
+#endif
 void ink_aio_init(ModuleVersion version);
 int ink_aio_start();
 void ink_aio_set_callback(Continuation * error_callback);
 
 int ink_aio_read(AIOCallback *op, int fromAPI = 0);   // fromAPI is a boolean to indicate if this is from a API call such as upload proxy feature
 int ink_aio_write(AIOCallback *op, int fromAPI = 0);
-bool ink_aio_thread_num_set(int thread_num);
+int ink_aio_readv(AIOCallback *op, int fromAPI = 0);   // fromAPI is a boolean to indicate if this is from a API call such as upload proxy feature
+int ink_aio_writev(AIOCallback *op, int fromAPI = 0);
 AIOCallback *new_AIOCallback(void);
 #endif

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c95298df/iocore/aio/P_AIO.h
----------------------------------------------------------------------
diff --git a/iocore/aio/P_AIO.h b/iocore/aio/P_AIO.h
index 2d686e3..9e3321c 100644
--- a/iocore/aio/P_AIO.h
+++ b/iocore/aio/P_AIO.h
@@ -41,6 +41,58 @@
 #define AIO_MODULE_VERSION        makeModuleVersion(AIO_MODULE_MAJOR_VERSION,\
 						    AIO_MODULE_MINOR_VERSION,\
 						    PRIVATE_MODULE_HEADER)
+
+TS_INLINE int
+AIOCallback::ok()
+{
+  return (off_t) aiocb.aio_nbytes == (off_t) aio_result;
+}
+
+#if AIO_MODE == AIO_MODE_NATIVE
+
+extern Continuation *aio_err_callbck;
+
+struct AIOCallbackInternal: public AIOCallback
+{
+  int io_complete(int event, void *data);
+  AIOCallbackInternal()
+  {
+    memset ((char *) &(this->aiocb), 0, sizeof(this->aiocb));
+    SET_HANDLER(&AIOCallbackInternal::io_complete);
+  }
+};
+
+TS_INLINE int
+AIOCallbackInternal::io_complete(int event, void *data)
+{
+  (void) event;
+  (void) data;
+
+  if (!ok() && aio_err_callbck)
+    eventProcessor.schedule_imm(aio_err_callbck, ET_CALL, AIO_EVENT_DONE);
+  mutex = action.mutex;
+  MUTEX_LOCK(lock, mutex, this_ethread());
+  if (!action.cancelled)
+    action.continuation->handleEvent(AIO_EVENT_DONE, this);
+  return EVENT_DONE;
+}
+
+TS_INLINE int
+AIOVec::mainEvent(int event, Event *e) {
+  ++completed;
+  if (completed < size)
+    return EVENT_CONT;
+  else if (completed == size) {
+    MUTEX_LOCK(lock, action.mutex, this_ethread());
+    if (!action.cancelled)
+      action.continuation->handleEvent(AIO_EVENT_DONE, this);
+    delete this;
+    return EVENT_DONE;
+  }
+  ink_assert(!"AIOVec mainEvent err");
+  return EVENT_ERROR;
+}
+#else
 struct AIO_Reqs;
 
 struct AIOCallbackInternal: public AIOCallback
@@ -61,12 +113,6 @@ struct AIOCallbackInternal: public AIOCallback
 };
 
 TS_INLINE int
-AIOCallback::ok()
-{
-  return (off_t) aiocb.aio_nbytes == (off_t) aio_result;
-}
-
-TS_INLINE int
 AIOCallbackInternal::io_complete(int event, void *data)
 {
   (void) event;
@@ -92,6 +138,7 @@ struct AIO_Reqs
   volatile int requests_queued;
 };
 
+#endif // AIO_MODE == AIO_MODE_NATIVE
 #ifdef AIO_STATS
 class AIOTestData:public Continuation
 {

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c95298df/iocore/cache/Cache.cc
----------------------------------------------------------------------
diff --git a/iocore/cache/Cache.cc b/iocore/cache/Cache.cc
index 1232a11..d5ba29f 100644
--- a/iocore/cache/Cache.cc
+++ b/iocore/cache/Cache.cc
@@ -146,6 +146,51 @@ struct VolInitInfo
   }
 };
 
+#if AIO_MODE == AIO_MODE_NATIVE
+struct VolInit : public Continuation
+{
+  Vol *vol;
+  char *path;
+  off_t blocks;
+  int64_t offset;
+  bool vol_clear;
+
+  int mainEvent(int event, Event *e) {
+    vol->init(path, blocks, offset, vol_clear);
+    mutex.clear();
+    delete this;
+    return EVENT_DONE;
+  }
+
+  VolInit(Vol *v, char *p, off_t b, int64_t o, bool c) : Continuation(v->mutex),
+    vol(v), path(p), blocks(b), offset(o), vol_clear(c) {
+    SET_HANDLER(&VolInit::mainEvent);
+  }
+};
+
+struct DiskInit : public Continuation
+{
+  CacheDisk *disk;
+  char *s;
+  off_t blocks;
+  off_t askip;
+  int ahw_sector_size;
+  int fildes;
+  bool clear;
+
+  int mainEvent(int event, Event *e) {
+    disk->open(s, blocks, askip, ahw_sector_size, fildes, clear);
+    mutex.clear();
+    delete this;
+    return EVENT_DONE;
+  }
+
+  DiskInit(CacheDisk *d, char *str, off_t b, off_t skip, int sector, int f, bool c) : Continuation(d->mutex),
+      disk(d), s(str), blocks(b), askip(skip), ahw_sector_size(sector), fildes(f), clear(c) {
+    SET_HANDLER(&DiskInit::mainEvent);
+  }
+};
+#endif
 void cplist_init();
 static void cplist_update();
 int cplist_reconfigure();
@@ -530,6 +575,16 @@ CacheProcessor::start_internal(int flags)
   verify_cache_api();
 #endif
 
+#if AIO_MODE == AIO_MODE_NATIVE
+  int etype = ET_NET;
+  int n_netthreads = eventProcessor.n_threads_for_type[etype];
+  EThread **netthreads = eventProcessor.eventthread[etype];
+  for (int i = 0; i < n_netthreads; ++i) {
+    netthreads[i]->diskHandler = new DiskHandler();
+    netthreads[i]->schedule_imm(netthreads[i]->diskHandler);
+  }
+#endif
+
   start_internal_flags = flags;
   clear = !!(flags & PROCESSOR_RECONFIGURE) || auto_clear_flag;
   fix = !!(flags & PROCESSOR_FIX);
@@ -593,7 +648,11 @@ CacheProcessor::start_internal(int flags)
         }
         off_t skip = ROUND_TO_STORE_BLOCK((sd->offset < START_POS ? START_POS + sd->alignment : sd->offset));
         blocks = blocks - ROUND_TO_STORE_BLOCK(sd->offset + skip);
+#if AIO_MODE == AIO_MODE_NATIVE
+        eventProcessor.schedule_imm(NEW(new DiskInit(gdisks[gndisks], path, blocks, skip, sector_size, fd, clear)));
+#else
         gdisks[gndisks]->open(path, blocks, skip, sector_size, fd, clear);
+#endif
         gndisks++;
       }
     } else {
@@ -1109,8 +1168,11 @@ Vol::init(char *s, off_t blocks, off_t dir_skip, bool clear)
     aio->thread = AIO_CALLBACK_THREAD_ANY;
     aio->then = (i < 3) ? &(init_info->vol_aio[i + 1]) : 0;
   }
-
+#if AIO_MODE == AIO_MODE_NATIVE
+  ink_assert(ink_aio_readv(init_info->vol_aio));
+#else
   ink_assert(ink_aio_read(init_info->vol_aio));
+#endif
   return 0;
 }
 
@@ -1440,7 +1502,11 @@ Ldone:{
     init_info->vol_aio[2].aiocb.aio_offset = ss + dirlen - footerlen;
 
     SET_HANDLER(&Vol::handle_recover_write_dir);
+#if AIO_MODE == AIO_MODE_NATIVE
+    ink_assert(ink_aio_writev(init_info->vol_aio));
+#else
     ink_assert(ink_aio_write(init_info->vol_aio));
+#endif
     return EVENT_CONT;
   }
 
@@ -1812,7 +1878,11 @@ Cache::open(bool clear, bool fix) {
             blocks = q->b->len;
 
             bool vol_clear = clear || d->cleared || q->new_block;
+#if AIO_MODE == AIO_MODE_NATIVE
+            eventProcessor.schedule_imm(NEW(new VolInit(cp->vols[vol_no], d->path, blocks, q->b->offset, vol_clear)));
+#else
             cp->vols[vol_no]->init(d->path, blocks, q->b->offset, vol_clear);
+#endif
             vol_no++;
             cache_size += blocks;
           }
@@ -1926,7 +1996,7 @@ CacheVC::handleReadDone(int event, Event *e) {
         if (checksum != doc->checksum) {
           Note("cache: checksum error for [%" PRIu64 " %" PRIu64 "] len %d, hlen %d, disk %s, offset %" PRIu64 " size %zu",
                doc->first_key.b[0], doc->first_key.b[1],
-               doc->len, doc->hlen, vol->path, io.aiocb.aio_offset, io.aiocb.aio_nbytes);
+               doc->len, doc->hlen, vol->path, io.aiocb.aio_offset, (size_t)io.aiocb.aio_nbytes);
           doc->magic = DOC_CORRUPT;
           okay = 0;
         }

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c95298df/iocore/cache/CacheVol.cc
----------------------------------------------------------------------
diff --git a/iocore/cache/CacheVol.cc b/iocore/cache/CacheVol.cc
index bf4fad9..ccd2009 100644
--- a/iocore/cache/CacheVol.cc
+++ b/iocore/cache/CacheVol.cc
@@ -365,7 +365,7 @@ Lread:
   offset = 0;
   ink_assert(ink_aio_read(&io) >= 0);
   Debug("cache_scan_truss", "read %p:scanObject %" PRId64 " %zu", this,
-        (int64_t)io.aiocb.aio_offset, io.aiocb.aio_nbytes);
+        (int64_t)io.aiocb.aio_offset, (size_t)io.aiocb.aio_nbytes);
   return EVENT_CONT;
 
 Ldone:

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c95298df/lib/ts/ink_aiocb.h
----------------------------------------------------------------------
diff --git a/lib/ts/ink_aiocb.h b/lib/ts/ink_aiocb.h
index 1cf9d38..543243b 100644
--- a/lib/ts/ink_aiocb.h
+++ b/lib/ts/ink_aiocb.h
@@ -39,7 +39,7 @@
 #define	LIO_READ	0x1
 #define	LIO_WRITE	0x2
 
-typedef struct ink_aiocb
+struct ink_aiocb
 {
   int aio_fildes;
 #if	defined(__STDC__)
@@ -58,6 +58,6 @@ typedef struct ink_aiocb
   //    aio_result_t    aio_resultp;    /* results */
   int aio_state;                /* state flag for List I/O */
   int aio__pad[1];              /* extension padding */
-} ink_aiocb_t;
+};
 
 #endif

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c95298df/lib/ts/ink_config.h.in
----------------------------------------------------------------------
diff --git a/lib/ts/ink_config.h.in b/lib/ts/ink_config.h.in
index 6b0525c..93e393c 100644
--- a/lib/ts/ink_config.h.in
+++ b/lib/ts/ink_config.h.in
@@ -122,6 +122,7 @@
 #define TS_USE_RECLAIMABLE_FREELIST    @use_reclaimable_freelist@
 #define TS_USE_TLS_NPN                 @use_tls_npn@
 #define TS_USE_TLS_SNI                 @use_tls_sni@
+#define TS_USE_LINUX_NATIVE_AIO        @use_linux_native_aio@
 
 /* OS API definitions */
 #define GETHOSTBYNAME_R_HOSTENT_DATA   @gethostbyname_r_hostent_data@

http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c95298df/proxy/InkAPI.cc
----------------------------------------------------------------------
diff --git a/proxy/InkAPI.cc b/proxy/InkAPI.cc
index 661a2d0..fd8258e 100644
--- a/proxy/InkAPI.cc
+++ b/proxy/InkAPI.cc
@@ -7324,10 +7324,14 @@ TSAIOWrite(int fd, off_t offset, char* buf, const size_t bufSize, TSCont contp)
 TSReturnCode
 TSAIOThreadNumSet(int thread_num)
 {
+#if AIO_MODE == AIO_MODE_NATIVE
+  return TS_SUCCESS;
+#else
   if (ink_aio_thread_num_set(thread_num))
     return TS_SUCCESS;
 
   return TS_ERROR;
+#endif
 }
 
 void


Re: git commit: TS-1760: Option to use Linux native AIO

Posted by Igor Galić <i....@brainsware.org>.

----- Original Message -----
> Updated Branches:
>   refs/heads/master 84df57e0e -> c95298dfc
> 
> 
> TS-1760: Option to use Linux native AIO
> 
> to enable the Linux Native AIO, be sure to check Linux kernel AIO
> supporting and use '--use_linux_native_aio' configure directive.
> 
> in the Linux Native AIO, all the IO is managed by system, so
> proxy.config.cache.threads_per_disk have no meaning anymore.
> 
> 
> Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo
> Commit:
> http://git-wip-us.apache.org/repos/asf/trafficserver/commit/c95298df
> Tree:
> http://git-wip-us.apache.org/repos/asf/trafficserver/tree/c95298df
> Diff:
> http://git-wip-us.apache.org/repos/asf/trafficserver/diff/c95298df
> 
> Branch: refs/heads/master
> Commit: c95298dfcc6336cc04f172713f5d41e023d19d51
> Parents: 84df57e
> Author: weijin <ta...@taobao.com>
> Authored: Wed Apr 3 14:59:48 2013 +0800
> Committer: Zhao Yongming <mi...@gmail.com>
> Committed: Wed Apr 3 15:34:53 2013 +0800
> 
> ----------------------------------------------------------------------
>  configure.ac             |   19 +++++
>  iocore/aio/AIO.cc        |  146
>  +++++++++++++++++++++++++++++++++++++++--
>  iocore/aio/I_AIO.h       |  129
>  ++++++++++++++++++++++++++++++++++++-
>  iocore/aio/P_AIO.h       |   59 +++++++++++++++--
>  iocore/cache/Cache.cc    |   74 ++++++++++++++++++++-
>  iocore/cache/CacheVol.cc |    2 +-
>  lib/ts/ink_aiocb.h       |    4 +-
>  lib/ts/ink_config.h.in   |    1 +
>  proxy/InkAPI.cc          |    4 +
>  9 files changed, 421 insertions(+), 17 deletions(-)
> ----------------------------------------------------------------------
> 
> 
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c95298df/configure.ac
> ----------------------------------------------------------------------
> diff --git a/configure.ac b/configure.ac
> index 7b30f26..72e60b0 100644
> --- a/configure.ac
> +++ b/configure.ac
> @@ -398,6 +398,25 @@ TS_ARG_ENABLE_VAR([use], [reclaimable_freelist])
>  AC_SUBST(use_reclaimable_freelist)
>  
>  #
> +# If the OS is linux, we can use '--use_linux_native_aio' option to
> +# replace the aio thread mode. Effective only on the linux system.
> +#
> +
> +if test "x${host_os_def}" = "xlinux"; then
> +  AC_MSG_CHECKING([whether to use native aio or not])
> +  AC_ARG_ENABLE([linux_native_aio],
> +    [AS_HELP_STRING([--enable-linux-native-aio],
> +      [turn on linux native aio, only effective on linux system])],
> +    [],
> +    [enable_linux_native_aio="yes"])
> +  AC_MSG_RESULT([$enable_linux_native_aio])
> +else
> +  enable_linux_native_aio="no"
> +fi
> +TS_ARG_ENABLE_VAR([use], [linux_native_aio])
> +AC_SUBST(use_linux_native_aio)
> +
> +
>  # Configure how many stats to allocate for plugins. Default is 512.
>  #
>  AC_ARG_WITH([max-api-stats],
> 
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c95298df/iocore/aio/AIO.cc
> ----------------------------------------------------------------------
> diff --git a/iocore/aio/AIO.cc b/iocore/aio/AIO.cc
> index 3086f0c..40bf1b1 100644
> --- a/iocore/aio/AIO.cc
> +++ b/iocore/aio/AIO.cc
> @@ -27,6 +27,10 @@
>  
>  #include "P_AIO.h"
>  
> +#if AIO_MODE == AIO_MODE_NATIVE
> +#define AIO_PERIOD
> -HRTIME_MSECONDS(4)
> +#else
> +
>  #define MAX_DISKS_POSSIBLE 100
>  
>  // globals
> @@ -36,24 +40,24 @@ int ts_config_with_inkdiskio = 0;
>  AIO_Reqs *aio_reqs[MAX_DISKS_POSSIBLE];
>  /* number of unique file descriptors in the aio_reqs array */
>  volatile int num_filedes = 1;
> -RecRawStatBlock *aio_rsb = NULL;
> +
>  // acquire this mutex before inserting a new entry in the aio_reqs
>  array.
>  // Don't need to acquire this for searching the array
>  static ink_mutex insert_mutex;
> -Continuation *aio_err_callbck = 0;
> +
>  RecInt cache_config_threads_per_disk = 12;
>  RecInt api_config_threads_per_disk = 12;
>  int thread_is_created = 0;
> +#endif // AIO_MODE == AIO_MODE_NATIVE
>  
> -
> +RecRawStatBlock *aio_rsb = NULL;
> +Continuation *aio_err_callbck = 0;
>  // AIO Stats
>  uint64_t aio_num_read = 0;
>  uint64_t aio_bytes_read = 0;
>  uint64_t aio_num_write = 0;
>  uint64_t aio_bytes_written = 0;
>  
> -static void aio_move(AIO_Reqs *req);
> -
>  /*
>   * Stats
>   */
> @@ -156,10 +160,12 @@ ink_aio_init(ModuleVersion v)
>    RecRegisterRawStat(aio_rsb, RECT_PROCESS,
>                       "proxy.process.cache.KB_write_per_sec",
>                       RECD_FLOAT, RECP_NULL, (int)
>                       AIO_STAT_KB_WRITE_PER_SEC, aio_stats_cb);
> +#if AIO_MODE != AIO_MODE_NATIVE
>    memset(&aio_reqs, 0, MAX_DISKS_POSSIBLE * sizeof(AIO_Reqs *));
>    ink_mutex_init(&insert_mutex, NULL);
>  
>    REC_ReadConfigInteger(cache_config_threads_per_disk,
>    "proxy.config.cache.threads_per_disk");
> +#endif
>  }
>  
>  int
> @@ -172,6 +178,7 @@ ink_aio_start()
>    return 0;
>  }
>  
> +#if  AIO_MODE != AIO_MODE_NATIVE
>  
>  static void *aio_thread_main(void *arg);
>  
> @@ -534,3 +541,132 @@ aio_thread_main(void *arg)
>    }
>    return 0;
>  }
> +#else
> +int
> +DiskHandler::startAIOEvent(int event, Event *e) {
> +  SET_HANDLER(&DiskHandler::mainAIOEvent);
> +  e->schedule_every(AIO_PERIOD);
> +  trigger_event = e;
> +  return EVENT_CONT;
> +}
> +

> +int
> +DiskHandler::mainAIOEvent(int event, Event *e) {
> +  AIOCallback *op = NULL;
> +Lagain:
> +  int ret = io_getevents(ctx, 0, MAX_AIO_EVENTS, events, NULL);
> +  //printf("%d\n", ret);
> +  for (int i = 0; i < ret; i++) {
> +    op = (AIOCallback *) events[i].data;
> +    op->aio_result = events[i].res;
> +    ink_assert(op->action.continuation);
> +    complete_list.enqueue(op);
> +    //op->handleEvent(event, e);
> +  }
> +  if (ret == MAX_AIO_EVENTS)
> +    goto Lagain;

Why is this not a simple do { } while (ret == MAX_AIO_EVENTS); ?

> +  if (ret < 0)
> +    perror("io_getevents");
> +
> +  ink_aiocb_t *cbs[MAX_AIO_EVENTS];
> +  int num = 0;
> +  for (; num < MAX_AIO_EVENTS && ((op = ready_list.dequeue()) !=
> NULL); ++num) {
> +    cbs[num] = &op->aiocb;
> +    ink_debug_assert(op->action.continuation);
> +  }
> +  if (num > 0) {
> +    int ret;
> +    do {
> +      ret = io_submit(ctx, num, cbs);
> +    } while (ret < 0 && errno == EAGAIN);
> +
> +    if (ret != num) {
> +      if (ret < 0)
> +        perror("io_submit error");
> +      else {
> +        fprintf(stderr, "could not sumbit IOs");
> +        ink_debug_assert(0);
> +      }
> +    }
> +  }
> +
> +  while ((op = complete_list.dequeue()) != NULL) {
> +    op->handleEvent(event, e);
> +  }
> +  return EVENT_CONT;
> +}
> +
> +int
> +ink_aio_read(AIOCallback *op, int fromAPI) {
> +  op->aiocb.aio_reqprio = AIO_DEFAULT_PRIORITY;
> +  op->aiocb.aio_lio_opcode = IOCB_CMD_PREAD;
> +  op->aiocb.aio_data = op;
> +  this_ethread()->diskHandler->ready_list.enqueue(op);
> +
> +  return 1;
> +}
> +
> +int
> +ink_aio_write(AIOCallback *op, int fromAPI) {
> +  op->aiocb.aio_reqprio = AIO_DEFAULT_PRIORITY;
> +  op->aiocb.aio_lio_opcode = IOCB_CMD_PWRITE;
> +  op->aiocb.aio_data = op;
> +  this_ethread()->diskHandler->ready_list.enqueue(op);
> +
> +  return 1;
> +}
> +
> +int
> +ink_aio_readv(AIOCallback *op, int fromAPI) {
> +  DiskHandler *dh = this_ethread()->diskHandler;
> +  AIOCallback *io = op;
> +  int sz = 0;
> +
> +  while (io) {
> +    io->aiocb.aio_reqprio = AIO_DEFAULT_PRIORITY;
> +    io->aiocb.aio_lio_opcode = IOCB_CMD_PREAD;
> +    io->aiocb.aio_data = io;
> +    dh->ready_list.enqueue(io);
> +    ++sz;
> +    io = io->then;
> +  }
> +
> +  if (sz > 1) {
> +    ink_debug_assert(op->action.continuation);
> +    AIOVec *vec = new AIOVec(sz, op->action.continuation);
> +    vec->action = op->action.continuation;
> +    while (--sz >= 0) {
> +      op->action = vec;
> +      op = op->then;
> +    }
> +  }
> +  return 1;
> +}
> +
> +int
> +ink_aio_writev(AIOCallback *op, int fromAPI) {
> +  DiskHandler *dh = this_ethread()->diskHandler;
> +  AIOCallback *io = op;
> +  int sz = 0;
> +
> +  while (io) {
> +    io->aiocb.aio_reqprio = AIO_DEFAULT_PRIORITY;
> +    io->aiocb.aio_lio_opcode = IOCB_CMD_PWRITE;
> +    io->aiocb.aio_data = io;
> +    dh->ready_list.enqueue(io);
> +    ++sz;
> +    io = io->then;
> +  }
> +
> +  if (sz > 1) {
> +    ink_debug_assert(op->action.continuation);
> +    AIOVec *vec = new AIOVec(sz, op->action.continuation);
> +    vec->action = op->action.continuation;
> +    while (--sz >= 0) {
> +      op->action = vec;
> +      op = op->then;
> +    }
> +  }
> +  return 1;
> +}
> +#endif // AIO_MODE != AIO_MODE_NATIVE
> 
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c95298df/iocore/aio/I_AIO.h
> ----------------------------------------------------------------------
> diff --git a/iocore/aio/I_AIO.h b/iocore/aio/I_AIO.h
> index 8d4cd2b..ccdc078 100644
> --- a/iocore/aio/I_AIO.h
> +++ b/iocore/aio/I_AIO.h
> @@ -46,8 +46,114 @@
>  #define AIO_MODE_AIO             0
>  #define AIO_MODE_SYNC            1
>  #define AIO_MODE_THREAD          2
> +#define AIO_MODE_NATIVE          3
> +#if use_linux_native_aio
> +#define AIO_MODE                 AIO_MODE_NATIVE
> +#else
>  #define AIO_MODE                 AIO_MODE_THREAD
> +#endif
> +
> +#if AIO_MODE == AIO_MODE_NATIVE
> +
> +#include <sys/syscall.h>  /* for __NR_* definitions */
> +#include <linux/aio_abi.h>  /* for AIO types and constants */
> +#define MAX_AIO_EVENTS 1024
> +
> +#if defined(__LITTLE_ENDIAN)
> +#if (SIZEOF_VOID_POINTER == 4)
> +#define PADDEDPtr(x, y) x; unsigned y
> +#define PADDEDul(x, y) unsigned long x; unsigned y
> +#elif (SIZEOF_VOID_POINTER == 8)
> +#define PADDEDPtr(x, y) x
> +#define PADDEDul(x, y) unsigned long x
> +#endif
> +#elif defined(__BIG_ENDIAN)
> +#if (SIZEOF_VOID_POINTER == 4)
> +#define PADDEDPtr(x, y) unsigned y; x
> +#define PADDEDul(x, y) unsigned y; unsigned long y
> +#elif (SIZEOF_VOID_POINTER == 8)
> +#define PADDEDPtr(x, y) x
> +#define PADDEDul(x, y) unsigned long x
> +#endif
> +#else
> +#error edit for your odd byteorder.
> +#endif
> +
> +typedef struct ink_iocb {
> +  /* these are internal to the kernel/libc. */
> +  PADDEDPtr(void *aio_data, _pad1); /* data to be returned in
> event's data */
> +  unsigned PADDED(aio_key, aio_reserved1);
> +        /* the kernel sets aio_key to the req # */
> +
> +  /* common fields */
> +  short aio_lio_opcode; /* see IOCB_CMD_ above */
> +  short aio_reqprio;
> +  int aio_fildes;
> +
> +  PADDEDPtr(void *aio_buf, _pad2);
> +  PADDEDul(aio_nbytes, _pad3);
> +  int64_t aio_offset;
> +
> +  /* extra parameters */
> +  uint64_t aio_reserved2;  /* TODO: use this for a (struct sigevent
> *) */
> +
> +  /* flags for the "struct iocb" */
> +  int aio_flags;
> +
> +  /*
> +   * if the IOCB_FLAG_RESFD flag of "aio_flags" is set, this is an
> +   * eventfd to signal AIO readiness to
> +   */
> +  int aio_resfd;
> +
> +} ink_aiocb_t;
> +
> +typedef struct ink_io_event {
> +  PADDEDPtr(void *data, _pad1);   /* the data field from the iocb */
> +  PADDEDPtr(ink_aiocb_t *obj, _pad2);    /* what iocb this event
> came from */
> +  PADDEDul(res, _pad3);    /* result code for this event */
> +  PADDEDul(res2, _pad4);   /* secondary result */
> +} ink_io_event_t;
> +
> +TS_INLINE int io_setup(unsigned nr, aio_context_t *ctxp)
> +{
> +  return syscall(__NR_io_setup, nr, ctxp);
> +}
> +
> +TS_INLINE int io_destroy(aio_context_t ctx)
> +{
> +  return syscall(__NR_io_destroy, ctx);
> +}
> +
> +TS_INLINE int io_submit(aio_context_t ctx, long nr,  ink_aiocb_t
> **iocbpp)
> +{
> +  return syscall(__NR_io_submit, ctx, nr, iocbpp);
> +}
> +
> +TS_INLINE int io_getevents(aio_context_t ctx, long min_nr, long
> max_nr,
> +    ink_io_event_t *events, struct timespec *timeout)
> +{
> +  return syscall(__NR_io_getevents, ctx, min_nr, max_nr, events,
> timeout);
> +}
> +
> +struct AIOVec: public Continuation
> +{
> +  Action action;
> +  int size;
> +  int completed;
> +
> +  AIOVec(int sz, Continuation *c): Continuation(new_ProxyMutex()),
> size(sz), completed(0)
> +  {
> +    action = c;
> +    SET_HANDLER(&AIOVec::mainEvent);
> +  }
>  
> +  int mainEvent(int event, Event *e);
> +};
> +#else
> +typedef ink_aiocb ink_aiocb_t;
> +bool ink_aio_thread_num_set(int thread_num);
> +#endif
>  // AIOCallback::thread special values
>  #define AIO_CALLBACK_THREAD_ANY ((EThread*)0) // any regular event
>  thread
>  #define AIO_CALLBACK_THREAD_AIO ((EThread*)-1)
> @@ -71,12 +177,33 @@ struct AIOCallback: public Continuation
>    }
>  };
>  
> +#if AIO_MODE == AIO_MODE_NATIVE
> +struct DiskHandler: public Continuation
> +{
> +  Event *trigger_event;
> +  aio_context_t ctx;
> +  ink_io_event_t events[MAX_AIO_EVENTS];
> +  Que(AIOCallback, link) ready_list;
> +  Que(AIOCallback, link) complete_list;
> +  int startAIOEvent(int event, Event *e);
> +  int mainAIOEvent(int event, Event *e);
> +  DiskHandler() {
> +    SET_HANDLER(&DiskHandler::startAIOEvent);
> +    memset(&ctx, 0, sizeof(aio_context_t));
> +    int ret = io_setup(MAX_AIO_EVENTS, &ctx);
> +    if (ret < 0) {
> +      perror("io_setup error");
> +    }
> +  }
> +};
> +#endif
>  void ink_aio_init(ModuleVersion version);
>  int ink_aio_start();
>  void ink_aio_set_callback(Continuation * error_callback);
>  
>  int ink_aio_read(AIOCallback *op, int fromAPI = 0);   // fromAPI is
>  a boolean to indicate if this is from a API call such as upload
>  proxy feature
>  int ink_aio_write(AIOCallback *op, int fromAPI = 0);
> -bool ink_aio_thread_num_set(int thread_num);
> +int ink_aio_readv(AIOCallback *op, int fromAPI = 0);   // fromAPI is
> a boolean to indicate if this is from a API call such as upload
> proxy feature
> +int ink_aio_writev(AIOCallback *op, int fromAPI = 0);
>  AIOCallback *new_AIOCallback(void);
>  #endif
> 
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c95298df/iocore/aio/P_AIO.h
> ----------------------------------------------------------------------
> diff --git a/iocore/aio/P_AIO.h b/iocore/aio/P_AIO.h
> index 2d686e3..9e3321c 100644
> --- a/iocore/aio/P_AIO.h
> +++ b/iocore/aio/P_AIO.h
> @@ -41,6 +41,58 @@
>  #define AIO_MODULE_VERSION
>  makeModuleVersion(AIO_MODULE_MAJOR_VERSION,\
>  						    AIO_MODULE_MINOR_VERSION,\
>  						    PRIVATE_MODULE_HEADER)
> +
> +TS_INLINE int
> +AIOCallback::ok()
> +{
> +  return (off_t) aiocb.aio_nbytes == (off_t) aio_result;
> +}
> +
> +#if AIO_MODE == AIO_MODE_NATIVE
> +
> +extern Continuation *aio_err_callbck;
> +
> +struct AIOCallbackInternal: public AIOCallback
> +{
> +  int io_complete(int event, void *data);
> +  AIOCallbackInternal()
> +  {
> +    memset ((char *) &(this->aiocb), 0, sizeof(this->aiocb));
> +    SET_HANDLER(&AIOCallbackInternal::io_complete);
> +  }
> +};
> +
> +TS_INLINE int
> +AIOCallbackInternal::io_complete(int event, void *data)
> +{
> +  (void) event;
> +  (void) data;
> +
> +  if (!ok() && aio_err_callbck)
> +    eventProcessor.schedule_imm(aio_err_callbck, ET_CALL,
> AIO_EVENT_DONE);
> +  mutex = action.mutex;
> +  MUTEX_LOCK(lock, mutex, this_ethread());
> +  if (!action.cancelled)
> +    action.continuation->handleEvent(AIO_EVENT_DONE, this);
> +  return EVENT_DONE;
> +}
> +
> +TS_INLINE int
> +AIOVec::mainEvent(int event, Event *e) {
> +  ++completed;
> +  if (completed < size)
> +    return EVENT_CONT;
> +  else if (completed == size) {
> +    MUTEX_LOCK(lock, action.mutex, this_ethread());
> +    if (!action.cancelled)
> +      action.continuation->handleEvent(AIO_EVENT_DONE, this);
> +    delete this;
> +    return EVENT_DONE;
> +  }
> +  ink_assert(!"AIOVec mainEvent err");
> +  return EVENT_ERROR;
> +}
> +#else
>  struct AIO_Reqs;
>  
>  struct AIOCallbackInternal: public AIOCallback
> @@ -61,12 +113,6 @@ struct AIOCallbackInternal: public AIOCallback
>  };
>  
>  TS_INLINE int
> -AIOCallback::ok()
> -{
> -  return (off_t) aiocb.aio_nbytes == (off_t) aio_result;
> -}
> -
> -TS_INLINE int
>  AIOCallbackInternal::io_complete(int event, void *data)
>  {
>    (void) event;
> @@ -92,6 +138,7 @@ struct AIO_Reqs
>    volatile int requests_queued;
>  };
>  
> +#endif // AIO_MODE == AIO_MODE_NATIVE
>  #ifdef AIO_STATS
>  class AIOTestData:public Continuation
>  {
> 
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c95298df/iocore/cache/Cache.cc
> ----------------------------------------------------------------------
> diff --git a/iocore/cache/Cache.cc b/iocore/cache/Cache.cc
> index 1232a11..d5ba29f 100644
> --- a/iocore/cache/Cache.cc
> +++ b/iocore/cache/Cache.cc
> @@ -146,6 +146,51 @@ struct VolInitInfo
>    }
>  };
>  
> +#if AIO_MODE == AIO_MODE_NATIVE
> +struct VolInit : public Continuation
> +{
> +  Vol *vol;
> +  char *path;
> +  off_t blocks;
> +  int64_t offset;
> +  bool vol_clear;
> +
> +  int mainEvent(int event, Event *e) {
> +    vol->init(path, blocks, offset, vol_clear);
> +    mutex.clear();
> +    delete this;
> +    return EVENT_DONE;
> +  }
> +
> +  VolInit(Vol *v, char *p, off_t b, int64_t o, bool c) :
> Continuation(v->mutex),
> +    vol(v), path(p), blocks(b), offset(o), vol_clear(c) {
> +    SET_HANDLER(&VolInit::mainEvent);
> +  }
> +};
> +
> +struct DiskInit : public Continuation
> +{
> +  CacheDisk *disk;
> +  char *s;
> +  off_t blocks;
> +  off_t askip;
> +  int ahw_sector_size;
> +  int fildes;
> +  bool clear;
> +
> +  int mainEvent(int event, Event *e) {
> +    disk->open(s, blocks, askip, ahw_sector_size, fildes, clear);
> +    mutex.clear();
> +    delete this;
> +    return EVENT_DONE;
> +  }
> +
> +  DiskInit(CacheDisk *d, char *str, off_t b, off_t skip, int sector,
> int f, bool c) : Continuation(d->mutex),
> +      disk(d), s(str), blocks(b), askip(skip),
> ahw_sector_size(sector), fildes(f), clear(c) {
> +    SET_HANDLER(&DiskInit::mainEvent);
> +  }
> +};
> +#endif
>  void cplist_init();
>  static void cplist_update();
>  int cplist_reconfigure();
> @@ -530,6 +575,16 @@ CacheProcessor::start_internal(int flags)
>    verify_cache_api();
>  #endif
>  
> +#if AIO_MODE == AIO_MODE_NATIVE
> +  int etype = ET_NET;
> +  int n_netthreads = eventProcessor.n_threads_for_type[etype];
> +  EThread **netthreads = eventProcessor.eventthread[etype];
> +  for (int i = 0; i < n_netthreads; ++i) {
> +    netthreads[i]->diskHandler = new DiskHandler();
> +    netthreads[i]->schedule_imm(netthreads[i]->diskHandler);
> +  }
> +#endif
> +
>    start_internal_flags = flags;
>    clear = !!(flags & PROCESSOR_RECONFIGURE) || auto_clear_flag;
>    fix = !!(flags & PROCESSOR_FIX);
> @@ -593,7 +648,11 @@ CacheProcessor::start_internal(int flags)
>          }
>          off_t skip = ROUND_TO_STORE_BLOCK((sd->offset < START_POS ?
>          START_POS + sd->alignment : sd->offset));
>          blocks = blocks - ROUND_TO_STORE_BLOCK(sd->offset + skip);
> +#if AIO_MODE == AIO_MODE_NATIVE
> +        eventProcessor.schedule_imm(NEW(new
> DiskInit(gdisks[gndisks], path, blocks, skip, sector_size, fd,
> clear)));
> +#else
>          gdisks[gndisks]->open(path, blocks, skip, sector_size, fd,
>          clear);
> +#endif
>          gndisks++;
>        }
>      } else {
> @@ -1109,8 +1168,11 @@ Vol::init(char *s, off_t blocks, off_t
> dir_skip, bool clear)
>      aio->thread = AIO_CALLBACK_THREAD_ANY;
>      aio->then = (i < 3) ? &(init_info->vol_aio[i + 1]) : 0;
>    }
> -
> +#if AIO_MODE == AIO_MODE_NATIVE
> +  ink_assert(ink_aio_readv(init_info->vol_aio));
> +#else
>    ink_assert(ink_aio_read(init_info->vol_aio));
> +#endif
>    return 0;
>  }
>  
> @@ -1440,7 +1502,11 @@ Ldone:{
>      init_info->vol_aio[2].aiocb.aio_offset = ss + dirlen -
>      footerlen;
>  
>      SET_HANDLER(&Vol::handle_recover_write_dir);
> +#if AIO_MODE == AIO_MODE_NATIVE
> +    ink_assert(ink_aio_writev(init_info->vol_aio));
> +#else
>      ink_assert(ink_aio_write(init_info->vol_aio));
> +#endif
>      return EVENT_CONT;
>    }
>  
> @@ -1812,7 +1878,11 @@ Cache::open(bool clear, bool fix) {
>              blocks = q->b->len;
>  
>              bool vol_clear = clear || d->cleared || q->new_block;
> +#if AIO_MODE == AIO_MODE_NATIVE
> +            eventProcessor.schedule_imm(NEW(new
> VolInit(cp->vols[vol_no], d->path, blocks, q->b->offset,
> vol_clear)));
> +#else
>              cp->vols[vol_no]->init(d->path, blocks, q->b->offset,
>              vol_clear);
> +#endif
>              vol_no++;
>              cache_size += blocks;
>            }
> @@ -1926,7 +1996,7 @@ CacheVC::handleReadDone(int event, Event *e) {
>          if (checksum != doc->checksum) {
>            Note("cache: checksum error for [%" PRIu64 " %" PRIu64 "]
>            len %d, hlen %d, disk %s, offset %" PRIu64 " size %zu",
>                 doc->first_key.b[0], doc->first_key.b[1],
> -               doc->len, doc->hlen, vol->path, io.aiocb.aio_offset,
> io.aiocb.aio_nbytes);
> +               doc->len, doc->hlen, vol->path, io.aiocb.aio_offset,
> (size_t)io.aiocb.aio_nbytes);
>            doc->magic = DOC_CORRUPT;
>            okay = 0;
>          }
> 
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c95298df/iocore/cache/CacheVol.cc
> ----------------------------------------------------------------------
> diff --git a/iocore/cache/CacheVol.cc b/iocore/cache/CacheVol.cc
> index bf4fad9..ccd2009 100644
> --- a/iocore/cache/CacheVol.cc
> +++ b/iocore/cache/CacheVol.cc
> @@ -365,7 +365,7 @@ Lread:
>    offset = 0;
>    ink_assert(ink_aio_read(&io) >= 0);
>    Debug("cache_scan_truss", "read %p:scanObject %" PRId64 " %zu",
>    this,
> -        (int64_t)io.aiocb.aio_offset, io.aiocb.aio_nbytes);
> +        (int64_t)io.aiocb.aio_offset, (size_t)io.aiocb.aio_nbytes);
>    return EVENT_CONT;
>  
>  Ldone:
> 
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c95298df/lib/ts/ink_aiocb.h
> ----------------------------------------------------------------------
> diff --git a/lib/ts/ink_aiocb.h b/lib/ts/ink_aiocb.h
> index 1cf9d38..543243b 100644
> --- a/lib/ts/ink_aiocb.h
> +++ b/lib/ts/ink_aiocb.h
> @@ -39,7 +39,7 @@
>  #define	LIO_READ	0x1
>  #define	LIO_WRITE	0x2
>  
> -typedef struct ink_aiocb
> +struct ink_aiocb
>  {
>    int aio_fildes;
>  #if	defined(__STDC__)
> @@ -58,6 +58,6 @@ typedef struct ink_aiocb
>    //    aio_result_t    aio_resultp;    /* results */
>    int aio_state;                /* state flag for List I/O */
>    int aio__pad[1];              /* extension padding */
> -} ink_aiocb_t;
> +};
>  
>  #endif
> 
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c95298df/lib/ts/ink_config.h.in
> ----------------------------------------------------------------------
> diff --git a/lib/ts/ink_config.h.in b/lib/ts/ink_config.h.in
> index 6b0525c..93e393c 100644
> --- a/lib/ts/ink_config.h.in
> +++ b/lib/ts/ink_config.h.in
> @@ -122,6 +122,7 @@
>  #define TS_USE_RECLAIMABLE_FREELIST    @use_reclaimable_freelist@
>  #define TS_USE_TLS_NPN                 @use_tls_npn@
>  #define TS_USE_TLS_SNI                 @use_tls_sni@
> +#define TS_USE_LINUX_NATIVE_AIO        @use_linux_native_aio@
>  
>  /* OS API definitions */
>  #define GETHOSTBYNAME_R_HOSTENT_DATA
>  @gethostbyname_r_hostent_data@
> 
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c95298df/proxy/InkAPI.cc
> ----------------------------------------------------------------------
> diff --git a/proxy/InkAPI.cc b/proxy/InkAPI.cc
> index 661a2d0..fd8258e 100644
> --- a/proxy/InkAPI.cc
> +++ b/proxy/InkAPI.cc
> @@ -7324,10 +7324,14 @@ TSAIOWrite(int fd, off_t offset, char* buf,
> const size_t bufSize, TSCont contp)
>  TSReturnCode
>  TSAIOThreadNumSet(int thread_num)
>  {
> +#if AIO_MODE == AIO_MODE_NATIVE
> +  return TS_SUCCESS;
> +#else
>    if (ink_aio_thread_num_set(thread_num))
>      return TS_SUCCESS;
>  
>    return TS_ERROR;
> +#endif
>  }
>  
>  void
> 
> 

-- 
Igor Galić

Tel: +43 (0) 664 886 22 883
Mail: i.galic@brainsware.org
URL: http://brainsware.org/
GPG: 6880 4155 74BD FD7C B515  2EA5 4B1D 9E08 A097 C9AE


Re: git commit: TS-1760: Option to use Linux native AIO

Posted by Igor Galić <i....@brainsware.org>.

----- Original Message -----
> Updated Branches:
>   refs/heads/master 84df57e0e -> c95298dfc
> 
> 
> TS-1760: Option to use Linux native AIO
> 
> to enable the Linux Native AIO, be sure to check Linux kernel AIO
> supporting and use '--use_linux_native_aio' configure directive.

as far as ./configure options go, --enable-linux-native-aio or --with-linux-native-aio
is probably a better choice


> in the Linux Native AIO, all the IO is managed by system, so
> proxy.config.cache.threads_per_disk have no meaning anymore.
> 
> 
> Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo
> Commit:
> http://git-wip-us.apache.org/repos/asf/trafficserver/commit/c95298df
> Tree:
> http://git-wip-us.apache.org/repos/asf/trafficserver/tree/c95298df
> Diff:
> http://git-wip-us.apache.org/repos/asf/trafficserver/diff/c95298df
> 
> Branch: refs/heads/master
> Commit: c95298dfcc6336cc04f172713f5d41e023d19d51
> Parents: 84df57e
> Author: weijin <ta...@taobao.com>
> Authored: Wed Apr 3 14:59:48 2013 +0800
> Committer: Zhao Yongming <mi...@gmail.com>
> Committed: Wed Apr 3 15:34:53 2013 +0800
> 
> ----------------------------------------------------------------------
>  configure.ac             |   19 +++++
>  iocore/aio/AIO.cc        |  146
>  +++++++++++++++++++++++++++++++++++++++--
>  iocore/aio/I_AIO.h       |  129
>  ++++++++++++++++++++++++++++++++++++-
>  iocore/aio/P_AIO.h       |   59 +++++++++++++++--
>  iocore/cache/Cache.cc    |   74 ++++++++++++++++++++-
>  iocore/cache/CacheVol.cc |    2 +-
>  lib/ts/ink_aiocb.h       |    4 +-
>  lib/ts/ink_config.h.in   |    1 +
>  proxy/InkAPI.cc          |    4 +
>  9 files changed, 421 insertions(+), 17 deletions(-)
> ----------------------------------------------------------------------
> 
> 
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c95298df/configure.ac
> ----------------------------------------------------------------------
> diff --git a/configure.ac b/configure.ac
> index 7b30f26..72e60b0 100644
> --- a/configure.ac
> +++ b/configure.ac
> @@ -398,6 +398,25 @@ TS_ARG_ENABLE_VAR([use], [reclaimable_freelist])
>  AC_SUBST(use_reclaimable_freelist)
>  
>  #
> +# If the OS is linux, we can use '--use_linux_native_aio' option to
> +# replace the aio thread mode. Effective only on the linux system.
> +#
> +
> +if test "x${host_os_def}" = "xlinux"; then
> +  AC_MSG_CHECKING([whether to use native aio or not])
> +  AC_ARG_ENABLE([linux_native_aio],
> +    [AS_HELP_STRING([--enable-linux-native-aio],
> +      [turn on linux native aio, only effective on linux system])],
> +    [],
> +    [enable_linux_native_aio="yes"])
> +  AC_MSG_RESULT([$enable_linux_native_aio])
> +else
> +  enable_linux_native_aio="no"
> +fi
> +TS_ARG_ENABLE_VAR([use], [linux_native_aio])
> +AC_SUBST(use_linux_native_aio)
> +
> +
>  # Configure how many stats to allocate for plugins. Default is 512.
>  #
>  AC_ARG_WITH([max-api-stats],
> 
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c95298df/iocore/aio/AIO.cc
> ----------------------------------------------------------------------
> diff --git a/iocore/aio/AIO.cc b/iocore/aio/AIO.cc
> index 3086f0c..40bf1b1 100644
> --- a/iocore/aio/AIO.cc
> +++ b/iocore/aio/AIO.cc
> @@ -27,6 +27,10 @@
>  
>  #include "P_AIO.h"
>  
> +#if AIO_MODE == AIO_MODE_NATIVE
> +#define AIO_PERIOD
> -HRTIME_MSECONDS(4)
> +#else
> +
>  #define MAX_DISKS_POSSIBLE 100
>  
>  // globals
> @@ -36,24 +40,24 @@ int ts_config_with_inkdiskio = 0;
>  AIO_Reqs *aio_reqs[MAX_DISKS_POSSIBLE];
>  /* number of unique file descriptors in the aio_reqs array */
>  volatile int num_filedes = 1;
> -RecRawStatBlock *aio_rsb = NULL;
> +
>  // acquire this mutex before inserting a new entry in the aio_reqs
>  array.
>  // Don't need to acquire this for searching the array
>  static ink_mutex insert_mutex;
> -Continuation *aio_err_callbck = 0;
> +
>  RecInt cache_config_threads_per_disk = 12;
>  RecInt api_config_threads_per_disk = 12;
>  int thread_is_created = 0;
> +#endif // AIO_MODE == AIO_MODE_NATIVE
>  
> -
> +RecRawStatBlock *aio_rsb = NULL;
> +Continuation *aio_err_callbck = 0;
>  // AIO Stats
>  uint64_t aio_num_read = 0;
>  uint64_t aio_bytes_read = 0;
>  uint64_t aio_num_write = 0;
>  uint64_t aio_bytes_written = 0;
>  
> -static void aio_move(AIO_Reqs *req);
> -
>  /*
>   * Stats
>   */
> @@ -156,10 +160,12 @@ ink_aio_init(ModuleVersion v)
>    RecRegisterRawStat(aio_rsb, RECT_PROCESS,
>                       "proxy.process.cache.KB_write_per_sec",
>                       RECD_FLOAT, RECP_NULL, (int)
>                       AIO_STAT_KB_WRITE_PER_SEC, aio_stats_cb);
> +#if AIO_MODE != AIO_MODE_NATIVE
>    memset(&aio_reqs, 0, MAX_DISKS_POSSIBLE * sizeof(AIO_Reqs *));
>    ink_mutex_init(&insert_mutex, NULL);
>  
>    REC_ReadConfigInteger(cache_config_threads_per_disk,
>    "proxy.config.cache.threads_per_disk");
> +#endif
>  }
>  
>  int
> @@ -172,6 +178,7 @@ ink_aio_start()
>    return 0;
>  }
>  
> +#if  AIO_MODE != AIO_MODE_NATIVE
>  
>  static void *aio_thread_main(void *arg);
>  
> @@ -534,3 +541,132 @@ aio_thread_main(void *arg)
>    }
>    return 0;
>  }
> +#else
> +int
> +DiskHandler::startAIOEvent(int event, Event *e) {
> +  SET_HANDLER(&DiskHandler::mainAIOEvent);
> +  e->schedule_every(AIO_PERIOD);
> +  trigger_event = e;
> +  return EVENT_CONT;
> +}
> +
> +int
> +DiskHandler::mainAIOEvent(int event, Event *e) {
> +  AIOCallback *op = NULL;
> +Lagain:
> +  int ret = io_getevents(ctx, 0, MAX_AIO_EVENTS, events, NULL);
> +  //printf("%d\n", ret);
> +  for (int i = 0; i < ret; i++) {
> +    op = (AIOCallback *) events[i].data;
> +    op->aio_result = events[i].res;
> +    ink_assert(op->action.continuation);
> +    complete_list.enqueue(op);
> +    //op->handleEvent(event, e);
> +  }
> +  if (ret == MAX_AIO_EVENTS)
> +    goto Lagain;
> +  if (ret < 0)
> +    perror("io_getevents");
> +
> +  ink_aiocb_t *cbs[MAX_AIO_EVENTS];
> +  int num = 0;
> +  for (; num < MAX_AIO_EVENTS && ((op = ready_list.dequeue()) !=
> NULL); ++num) {
> +    cbs[num] = &op->aiocb;
> +    ink_debug_assert(op->action.continuation);
> +  }
> +  if (num > 0) {
> +    int ret;
> +    do {
> +      ret = io_submit(ctx, num, cbs);
> +    } while (ret < 0 && errno == EAGAIN);
> +
> +    if (ret != num) {
> +      if (ret < 0)
> +        perror("io_submit error");
> +      else {
> +        fprintf(stderr, "could not sumbit IOs");
> +        ink_debug_assert(0);
> +      }
> +    }
> +  }
> +
> +  while ((op = complete_list.dequeue()) != NULL) {
> +    op->handleEvent(event, e);
> +  }
> +  return EVENT_CONT;
> +}
> +
> +int
> +ink_aio_read(AIOCallback *op, int fromAPI) {
> +  op->aiocb.aio_reqprio = AIO_DEFAULT_PRIORITY;
> +  op->aiocb.aio_lio_opcode = IOCB_CMD_PREAD;
> +  op->aiocb.aio_data = op;
> +  this_ethread()->diskHandler->ready_list.enqueue(op);
> +
> +  return 1;
> +}
> +
> +int
> +ink_aio_write(AIOCallback *op, int fromAPI) {
> +  op->aiocb.aio_reqprio = AIO_DEFAULT_PRIORITY;
> +  op->aiocb.aio_lio_opcode = IOCB_CMD_PWRITE;
> +  op->aiocb.aio_data = op;
> +  this_ethread()->diskHandler->ready_list.enqueue(op);
> +
> +  return 1;
> +}
> +
> +int
> +ink_aio_readv(AIOCallback *op, int fromAPI) {
> +  DiskHandler *dh = this_ethread()->diskHandler;
> +  AIOCallback *io = op;
> +  int sz = 0;
> +
> +  while (io) {
> +    io->aiocb.aio_reqprio = AIO_DEFAULT_PRIORITY;
> +    io->aiocb.aio_lio_opcode = IOCB_CMD_PREAD;
> +    io->aiocb.aio_data = io;
> +    dh->ready_list.enqueue(io);
> +    ++sz;
> +    io = io->then;
> +  }
> +
> +  if (sz > 1) {
> +    ink_debug_assert(op->action.continuation);
> +    AIOVec *vec = new AIOVec(sz, op->action.continuation);
> +    vec->action = op->action.continuation;
> +    while (--sz >= 0) {
> +      op->action = vec;
> +      op = op->then;
> +    }
> +  }
> +  return 1;
> +}
> +
> +int
> +ink_aio_writev(AIOCallback *op, int fromAPI) {
> +  DiskHandler *dh = this_ethread()->diskHandler;
> +  AIOCallback *io = op;
> +  int sz = 0;
> +
> +  while (io) {
> +    io->aiocb.aio_reqprio = AIO_DEFAULT_PRIORITY;
> +    io->aiocb.aio_lio_opcode = IOCB_CMD_PWRITE;
> +    io->aiocb.aio_data = io;
> +    dh->ready_list.enqueue(io);
> +    ++sz;
> +    io = io->then;
> +  }
> +
> +  if (sz > 1) {
> +    ink_debug_assert(op->action.continuation);
> +    AIOVec *vec = new AIOVec(sz, op->action.continuation);
> +    vec->action = op->action.continuation;
> +    while (--sz >= 0) {
> +      op->action = vec;
> +      op = op->then;
> +    }
> +  }
> +  return 1;
> +}
> +#endif // AIO_MODE != AIO_MODE_NATIVE
> 
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c95298df/iocore/aio/I_AIO.h
> ----------------------------------------------------------------------
> diff --git a/iocore/aio/I_AIO.h b/iocore/aio/I_AIO.h
> index 8d4cd2b..ccdc078 100644
> --- a/iocore/aio/I_AIO.h
> +++ b/iocore/aio/I_AIO.h
> @@ -46,8 +46,114 @@
>  #define AIO_MODE_AIO             0
>  #define AIO_MODE_SYNC            1
>  #define AIO_MODE_THREAD          2
> +#define AIO_MODE_NATIVE          3
> +#if use_linux_native_aio
> +#define AIO_MODE                 AIO_MODE_NATIVE
> +#else
>  #define AIO_MODE                 AIO_MODE_THREAD
> +#endif
> +
> +#if AIO_MODE == AIO_MODE_NATIVE
> +
> +#include <sys/syscall.h>  /* for __NR_* definitions */
> +#include <linux/aio_abi.h>  /* for AIO types and constants */
> +#define MAX_AIO_EVENTS 1024
> +
> +#if defined(__LITTLE_ENDIAN)
> +#if (SIZEOF_VOID_POINTER == 4)
> +#define PADDEDPtr(x, y) x; unsigned y
> +#define PADDEDul(x, y) unsigned long x; unsigned y
> +#elif (SIZEOF_VOID_POINTER == 8)
> +#define PADDEDPtr(x, y) x
> +#define PADDEDul(x, y) unsigned long x
> +#endif
> +#elif defined(__BIG_ENDIAN)
> +#if (SIZEOF_VOID_POINTER == 4)
> +#define PADDEDPtr(x, y) unsigned y; x
> +#define PADDEDul(x, y) unsigned y; unsigned long y
> +#elif (SIZEOF_VOID_POINTER == 8)
> +#define PADDEDPtr(x, y) x
> +#define PADDEDul(x, y) unsigned long x
> +#endif
> +#else
> +#error edit for your odd byteorder.
> +#endif
> +
> +typedef struct ink_iocb {
> +  /* these are internal to the kernel/libc. */
> +  PADDEDPtr(void *aio_data, _pad1); /* data to be returned in
> event's data */
> +  unsigned PADDED(aio_key, aio_reserved1);
> +        /* the kernel sets aio_key to the req # */
> +
> +  /* common fields */
> +  short aio_lio_opcode; /* see IOCB_CMD_ above */
> +  short aio_reqprio;
> +  int aio_fildes;
> +
> +  PADDEDPtr(void *aio_buf, _pad2);
> +  PADDEDul(aio_nbytes, _pad3);
> +  int64_t aio_offset;
> +
> +  /* extra parameters */
> +  uint64_t aio_reserved2;  /* TODO: use this for a (struct sigevent
> *) */
> +
> +  /* flags for the "struct iocb" */
> +  int aio_flags;
> +
> +  /*
> +   * if the IOCB_FLAG_RESFD flag of "aio_flags" is set, this is an
> +   * eventfd to signal AIO readiness to
> +   */
> +  int aio_resfd;
> +
> +} ink_aiocb_t;
> +
> +typedef struct ink_io_event {
> +  PADDEDPtr(void *data, _pad1);   /* the data field from the iocb */
> +  PADDEDPtr(ink_aiocb_t *obj, _pad2);    /* what iocb this event
> came from */
> +  PADDEDul(res, _pad3);    /* result code for this event */
> +  PADDEDul(res2, _pad4);   /* secondary result */
> +} ink_io_event_t;
> +
> +TS_INLINE int io_setup(unsigned nr, aio_context_t *ctxp)
> +{
> +  return syscall(__NR_io_setup, nr, ctxp);
> +}
> +
> +TS_INLINE int io_destroy(aio_context_t ctx)
> +{
> +  return syscall(__NR_io_destroy, ctx);
> +}
> +
> +TS_INLINE int io_submit(aio_context_t ctx, long nr,  ink_aiocb_t
> **iocbpp)
> +{
> +  return syscall(__NR_io_submit, ctx, nr, iocbpp);
> +}
> +
> +TS_INLINE int io_getevents(aio_context_t ctx, long min_nr, long
> max_nr,
> +    ink_io_event_t *events, struct timespec *timeout)
> +{
> +  return syscall(__NR_io_getevents, ctx, min_nr, max_nr, events,
> timeout);
> +}
> +
> +struct AIOVec: public Continuation
> +{
> +  Action action;
> +  int size;
> +  int completed;
> +
> +  AIOVec(int sz, Continuation *c): Continuation(new_ProxyMutex()),
> size(sz), completed(0)
> +  {
> +    action = c;
> +    SET_HANDLER(&AIOVec::mainEvent);
> +  }
>  
> +  int mainEvent(int event, Event *e);
> +};
> +#else
> +typedef ink_aiocb ink_aiocb_t;
> +bool ink_aio_thread_num_set(int thread_num);
> +#endif
>  // AIOCallback::thread special values
>  #define AIO_CALLBACK_THREAD_ANY ((EThread*)0) // any regular event
>  thread
>  #define AIO_CALLBACK_THREAD_AIO ((EThread*)-1)
> @@ -71,12 +177,33 @@ struct AIOCallback: public Continuation
>    }
>  };
>  
> +#if AIO_MODE == AIO_MODE_NATIVE
> +struct DiskHandler: public Continuation
> +{
> +  Event *trigger_event;
> +  aio_context_t ctx;
> +  ink_io_event_t events[MAX_AIO_EVENTS];
> +  Que(AIOCallback, link) ready_list;
> +  Que(AIOCallback, link) complete_list;
> +  int startAIOEvent(int event, Event *e);
> +  int mainAIOEvent(int event, Event *e);
> +  DiskHandler() {
> +    SET_HANDLER(&DiskHandler::startAIOEvent);
> +    memset(&ctx, 0, sizeof(aio_context_t));
> +    int ret = io_setup(MAX_AIO_EVENTS, &ctx);
> +    if (ret < 0) {
> +      perror("io_setup error");
> +    }
> +  }
> +};
> +#endif
>  void ink_aio_init(ModuleVersion version);
>  int ink_aio_start();
>  void ink_aio_set_callback(Continuation * error_callback);
>  
>  int ink_aio_read(AIOCallback *op, int fromAPI = 0);   // fromAPI is
>  a boolean to indicate if this is from a API call such as upload
>  proxy feature
>  int ink_aio_write(AIOCallback *op, int fromAPI = 0);
> -bool ink_aio_thread_num_set(int thread_num);
> +int ink_aio_readv(AIOCallback *op, int fromAPI = 0);   // fromAPI is
> a boolean to indicate if this is from a API call such as upload
> proxy feature
> +int ink_aio_writev(AIOCallback *op, int fromAPI = 0);
>  AIOCallback *new_AIOCallback(void);
>  #endif
> 
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c95298df/iocore/aio/P_AIO.h
> ----------------------------------------------------------------------
> diff --git a/iocore/aio/P_AIO.h b/iocore/aio/P_AIO.h
> index 2d686e3..9e3321c 100644
> --- a/iocore/aio/P_AIO.h
> +++ b/iocore/aio/P_AIO.h
> @@ -41,6 +41,58 @@
>  #define AIO_MODULE_VERSION
>  makeModuleVersion(AIO_MODULE_MAJOR_VERSION,\
>  						    AIO_MODULE_MINOR_VERSION,\
>  						    PRIVATE_MODULE_HEADER)
> +
> +TS_INLINE int
> +AIOCallback::ok()
> +{
> +  return (off_t) aiocb.aio_nbytes == (off_t) aio_result;
> +}
> +
> +#if AIO_MODE == AIO_MODE_NATIVE
> +
> +extern Continuation *aio_err_callbck;
> +
> +struct AIOCallbackInternal: public AIOCallback
> +{
> +  int io_complete(int event, void *data);
> +  AIOCallbackInternal()
> +  {
> +    memset ((char *) &(this->aiocb), 0, sizeof(this->aiocb));
> +    SET_HANDLER(&AIOCallbackInternal::io_complete);
> +  }
> +};
> +
> +TS_INLINE int
> +AIOCallbackInternal::io_complete(int event, void *data)
> +{
> +  (void) event;
> +  (void) data;
> +
> +  if (!ok() && aio_err_callbck)
> +    eventProcessor.schedule_imm(aio_err_callbck, ET_CALL,
> AIO_EVENT_DONE);
> +  mutex = action.mutex;
> +  MUTEX_LOCK(lock, mutex, this_ethread());
> +  if (!action.cancelled)
> +    action.continuation->handleEvent(AIO_EVENT_DONE, this);
> +  return EVENT_DONE;
> +}
> +
> +TS_INLINE int
> +AIOVec::mainEvent(int event, Event *e) {
> +  ++completed;
> +  if (completed < size)
> +    return EVENT_CONT;
> +  else if (completed == size) {
> +    MUTEX_LOCK(lock, action.mutex, this_ethread());
> +    if (!action.cancelled)
> +      action.continuation->handleEvent(AIO_EVENT_DONE, this);
> +    delete this;
> +    return EVENT_DONE;
> +  }
> +  ink_assert(!"AIOVec mainEvent err");
> +  return EVENT_ERROR;
> +}
> +#else
>  struct AIO_Reqs;
>  
>  struct AIOCallbackInternal: public AIOCallback
> @@ -61,12 +113,6 @@ struct AIOCallbackInternal: public AIOCallback
>  };
>  
>  TS_INLINE int
> -AIOCallback::ok()
> -{
> -  return (off_t) aiocb.aio_nbytes == (off_t) aio_result;
> -}
> -
> -TS_INLINE int
>  AIOCallbackInternal::io_complete(int event, void *data)
>  {
>    (void) event;
> @@ -92,6 +138,7 @@ struct AIO_Reqs
>    volatile int requests_queued;
>  };
>  
> +#endif // AIO_MODE == AIO_MODE_NATIVE
>  #ifdef AIO_STATS
>  class AIOTestData:public Continuation
>  {
> 
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c95298df/iocore/cache/Cache.cc
> ----------------------------------------------------------------------
> diff --git a/iocore/cache/Cache.cc b/iocore/cache/Cache.cc
> index 1232a11..d5ba29f 100644
> --- a/iocore/cache/Cache.cc
> +++ b/iocore/cache/Cache.cc
> @@ -146,6 +146,51 @@ struct VolInitInfo
>    }
>  };
>  
> +#if AIO_MODE == AIO_MODE_NATIVE
> +struct VolInit : public Continuation
> +{
> +  Vol *vol;
> +  char *path;
> +  off_t blocks;
> +  int64_t offset;
> +  bool vol_clear;
> +
> +  int mainEvent(int event, Event *e) {
> +    vol->init(path, blocks, offset, vol_clear);
> +    mutex.clear();
> +    delete this;
> +    return EVENT_DONE;
> +  }
> +
> +  VolInit(Vol *v, char *p, off_t b, int64_t o, bool c) :
> Continuation(v->mutex),
> +    vol(v), path(p), blocks(b), offset(o), vol_clear(c) {
> +    SET_HANDLER(&VolInit::mainEvent);
> +  }
> +};
> +
> +struct DiskInit : public Continuation
> +{
> +  CacheDisk *disk;
> +  char *s;
> +  off_t blocks;
> +  off_t askip;
> +  int ahw_sector_size;
> +  int fildes;
> +  bool clear;
> +
> +  int mainEvent(int event, Event *e) {
> +    disk->open(s, blocks, askip, ahw_sector_size, fildes, clear);
> +    mutex.clear();
> +    delete this;
> +    return EVENT_DONE;
> +  }
> +
> +  DiskInit(CacheDisk *d, char *str, off_t b, off_t skip, int sector,
> int f, bool c) : Continuation(d->mutex),
> +      disk(d), s(str), blocks(b), askip(skip),
> ahw_sector_size(sector), fildes(f), clear(c) {
> +    SET_HANDLER(&DiskInit::mainEvent);
> +  }
> +};
> +#endif
>  void cplist_init();
>  static void cplist_update();
>  int cplist_reconfigure();
> @@ -530,6 +575,16 @@ CacheProcessor::start_internal(int flags)
>    verify_cache_api();
>  #endif
>  
> +#if AIO_MODE == AIO_MODE_NATIVE
> +  int etype = ET_NET;
> +  int n_netthreads = eventProcessor.n_threads_for_type[etype];
> +  EThread **netthreads = eventProcessor.eventthread[etype];
> +  for (int i = 0; i < n_netthreads; ++i) {
> +    netthreads[i]->diskHandler = new DiskHandler();
> +    netthreads[i]->schedule_imm(netthreads[i]->diskHandler);
> +  }
> +#endif
> +
>    start_internal_flags = flags;
>    clear = !!(flags & PROCESSOR_RECONFIGURE) || auto_clear_flag;
>    fix = !!(flags & PROCESSOR_FIX);
> @@ -593,7 +648,11 @@ CacheProcessor::start_internal(int flags)
>          }
>          off_t skip = ROUND_TO_STORE_BLOCK((sd->offset < START_POS ?
>          START_POS + sd->alignment : sd->offset));
>          blocks = blocks - ROUND_TO_STORE_BLOCK(sd->offset + skip);
> +#if AIO_MODE == AIO_MODE_NATIVE
> +        eventProcessor.schedule_imm(NEW(new
> DiskInit(gdisks[gndisks], path, blocks, skip, sector_size, fd,
> clear)));
> +#else
>          gdisks[gndisks]->open(path, blocks, skip, sector_size, fd,
>          clear);
> +#endif
>          gndisks++;
>        }
>      } else {
> @@ -1109,8 +1168,11 @@ Vol::init(char *s, off_t blocks, off_t
> dir_skip, bool clear)
>      aio->thread = AIO_CALLBACK_THREAD_ANY;
>      aio->then = (i < 3) ? &(init_info->vol_aio[i + 1]) : 0;
>    }
> -
> +#if AIO_MODE == AIO_MODE_NATIVE
> +  ink_assert(ink_aio_readv(init_info->vol_aio));
> +#else
>    ink_assert(ink_aio_read(init_info->vol_aio));
> +#endif
>    return 0;
>  }
>  
> @@ -1440,7 +1502,11 @@ Ldone:{
>      init_info->vol_aio[2].aiocb.aio_offset = ss + dirlen -
>      footerlen;
>  
>      SET_HANDLER(&Vol::handle_recover_write_dir);
> +#if AIO_MODE == AIO_MODE_NATIVE
> +    ink_assert(ink_aio_writev(init_info->vol_aio));
> +#else
>      ink_assert(ink_aio_write(init_info->vol_aio));
> +#endif
>      return EVENT_CONT;
>    }
>  
> @@ -1812,7 +1878,11 @@ Cache::open(bool clear, bool fix) {
>              blocks = q->b->len;
>  
>              bool vol_clear = clear || d->cleared || q->new_block;
> +#if AIO_MODE == AIO_MODE_NATIVE
> +            eventProcessor.schedule_imm(NEW(new
> VolInit(cp->vols[vol_no], d->path, blocks, q->b->offset,
> vol_clear)));
> +#else
>              cp->vols[vol_no]->init(d->path, blocks, q->b->offset,
>              vol_clear);
> +#endif
>              vol_no++;
>              cache_size += blocks;
>            }
> @@ -1926,7 +1996,7 @@ CacheVC::handleReadDone(int event, Event *e) {
>          if (checksum != doc->checksum) {
>            Note("cache: checksum error for [%" PRIu64 " %" PRIu64 "]
>            len %d, hlen %d, disk %s, offset %" PRIu64 " size %zu",
>                 doc->first_key.b[0], doc->first_key.b[1],
> -               doc->len, doc->hlen, vol->path, io.aiocb.aio_offset,
> io.aiocb.aio_nbytes);
> +               doc->len, doc->hlen, vol->path, io.aiocb.aio_offset,
> (size_t)io.aiocb.aio_nbytes);
>            doc->magic = DOC_CORRUPT;
>            okay = 0;
>          }
> 
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c95298df/iocore/cache/CacheVol.cc
> ----------------------------------------------------------------------
> diff --git a/iocore/cache/CacheVol.cc b/iocore/cache/CacheVol.cc
> index bf4fad9..ccd2009 100644
> --- a/iocore/cache/CacheVol.cc
> +++ b/iocore/cache/CacheVol.cc
> @@ -365,7 +365,7 @@ Lread:
>    offset = 0;
>    ink_assert(ink_aio_read(&io) >= 0);
>    Debug("cache_scan_truss", "read %p:scanObject %" PRId64 " %zu",
>    this,
> -        (int64_t)io.aiocb.aio_offset, io.aiocb.aio_nbytes);
> +        (int64_t)io.aiocb.aio_offset, (size_t)io.aiocb.aio_nbytes);
>    return EVENT_CONT;
>  
>  Ldone:
> 
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c95298df/lib/ts/ink_aiocb.h
> ----------------------------------------------------------------------
> diff --git a/lib/ts/ink_aiocb.h b/lib/ts/ink_aiocb.h
> index 1cf9d38..543243b 100644
> --- a/lib/ts/ink_aiocb.h
> +++ b/lib/ts/ink_aiocb.h
> @@ -39,7 +39,7 @@
>  #define	LIO_READ	0x1
>  #define	LIO_WRITE	0x2
>  
> -typedef struct ink_aiocb
> +struct ink_aiocb
>  {
>    int aio_fildes;
>  #if	defined(__STDC__)
> @@ -58,6 +58,6 @@ typedef struct ink_aiocb
>    //    aio_result_t    aio_resultp;    /* results */
>    int aio_state;                /* state flag for List I/O */
>    int aio__pad[1];              /* extension padding */
> -} ink_aiocb_t;
> +};
>  
>  #endif
> 
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c95298df/lib/ts/ink_config.h.in
> ----------------------------------------------------------------------
> diff --git a/lib/ts/ink_config.h.in b/lib/ts/ink_config.h.in
> index 6b0525c..93e393c 100644
> --- a/lib/ts/ink_config.h.in
> +++ b/lib/ts/ink_config.h.in
> @@ -122,6 +122,7 @@
>  #define TS_USE_RECLAIMABLE_FREELIST    @use_reclaimable_freelist@
>  #define TS_USE_TLS_NPN                 @use_tls_npn@
>  #define TS_USE_TLS_SNI                 @use_tls_sni@
> +#define TS_USE_LINUX_NATIVE_AIO        @use_linux_native_aio@
>  
>  /* OS API definitions */
>  #define GETHOSTBYNAME_R_HOSTENT_DATA
>  @gethostbyname_r_hostent_data@
> 
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c95298df/proxy/InkAPI.cc
> ----------------------------------------------------------------------
> diff --git a/proxy/InkAPI.cc b/proxy/InkAPI.cc
> index 661a2d0..fd8258e 100644
> --- a/proxy/InkAPI.cc
> +++ b/proxy/InkAPI.cc
> @@ -7324,10 +7324,14 @@ TSAIOWrite(int fd, off_t offset, char* buf,
> const size_t bufSize, TSCont contp)
>  TSReturnCode
>  TSAIOThreadNumSet(int thread_num)
>  {
> +#if AIO_MODE == AIO_MODE_NATIVE
> +  return TS_SUCCESS;
> +#else
>    if (ink_aio_thread_num_set(thread_num))
>      return TS_SUCCESS;
>  
>    return TS_ERROR;
> +#endif
>  }
>  
>  void
> 
> 

-- 
Igor Galić

Tel: +43 (0) 664 886 22 883
Mail: i.galic@brainsware.org
URL: http://brainsware.org/
GPG: 6880 4155 74BD FD7C B515  2EA5 4B1D 9E08 A097 C9AE