You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by zy...@apache.org on 2013/04/03 09:35:29 UTC
git commit: TS-1760: Option to use Linux native AIO
Updated Branches:
refs/heads/master 84df57e0e -> c95298dfc
TS-1760: Option to use Linux native AIO
to enable the Linux Native AIO, be sure to check Linux kernel AIO
supporting and use '--use_linux_native_aio' configure directive.
in the Linux Native AIO, all the IO is managed by system, so
proxy.config.cache.threads_per_disk have no meaning anymore.
Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo
Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/c95298df
Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/c95298df
Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/c95298df
Branch: refs/heads/master
Commit: c95298dfcc6336cc04f172713f5d41e023d19d51
Parents: 84df57e
Author: weijin <ta...@taobao.com>
Authored: Wed Apr 3 14:59:48 2013 +0800
Committer: Zhao Yongming <mi...@gmail.com>
Committed: Wed Apr 3 15:34:53 2013 +0800
----------------------------------------------------------------------
configure.ac | 19 +++++
iocore/aio/AIO.cc | 146 +++++++++++++++++++++++++++++++++++++++--
iocore/aio/I_AIO.h | 129 ++++++++++++++++++++++++++++++++++++-
iocore/aio/P_AIO.h | 59 +++++++++++++++--
iocore/cache/Cache.cc | 74 ++++++++++++++++++++-
iocore/cache/CacheVol.cc | 2 +-
lib/ts/ink_aiocb.h | 4 +-
lib/ts/ink_config.h.in | 1 +
proxy/InkAPI.cc | 4 +
9 files changed, 421 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c95298df/configure.ac
----------------------------------------------------------------------
diff --git a/configure.ac b/configure.ac
index 7b30f26..72e60b0 100644
--- a/configure.ac
+++ b/configure.ac
@@ -398,6 +398,25 @@ TS_ARG_ENABLE_VAR([use], [reclaimable_freelist])
AC_SUBST(use_reclaimable_freelist)
#
+# If the OS is linux, we can use '--use_linux_native_aio' option to
+# replace the aio thread mode. Effective only on the linux system.
+#
+
+if test "x${host_os_def}" = "xlinux"; then
+ AC_MSG_CHECKING([whether to use native aio or not])
+ AC_ARG_ENABLE([linux_native_aio],
+ [AS_HELP_STRING([--enable-linux-native-aio],
+ [turn on linux native aio, only effective on linux system])],
+ [],
+ [enable_linux_native_aio="yes"])
+ AC_MSG_RESULT([$enable_linux_native_aio])
+else
+ enable_linux_native_aio="no"
+fi
+TS_ARG_ENABLE_VAR([use], [linux_native_aio])
+AC_SUBST(use_linux_native_aio)
+
+
# Configure how many stats to allocate for plugins. Default is 512.
#
AC_ARG_WITH([max-api-stats],
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c95298df/iocore/aio/AIO.cc
----------------------------------------------------------------------
diff --git a/iocore/aio/AIO.cc b/iocore/aio/AIO.cc
index 3086f0c..40bf1b1 100644
--- a/iocore/aio/AIO.cc
+++ b/iocore/aio/AIO.cc
@@ -27,6 +27,10 @@
#include "P_AIO.h"
+#if AIO_MODE == AIO_MODE_NATIVE
+#define AIO_PERIOD -HRTIME_MSECONDS(4)
+#else
+
#define MAX_DISKS_POSSIBLE 100
// globals
@@ -36,24 +40,24 @@ int ts_config_with_inkdiskio = 0;
AIO_Reqs *aio_reqs[MAX_DISKS_POSSIBLE];
/* number of unique file descriptors in the aio_reqs array */
volatile int num_filedes = 1;
-RecRawStatBlock *aio_rsb = NULL;
+
// acquire this mutex before inserting a new entry in the aio_reqs array.
// Don't need to acquire this for searching the array
static ink_mutex insert_mutex;
-Continuation *aio_err_callbck = 0;
+
RecInt cache_config_threads_per_disk = 12;
RecInt api_config_threads_per_disk = 12;
int thread_is_created = 0;
+#endif // AIO_MODE == AIO_MODE_NATIVE
-
+RecRawStatBlock *aio_rsb = NULL;
+Continuation *aio_err_callbck = 0;
// AIO Stats
uint64_t aio_num_read = 0;
uint64_t aio_bytes_read = 0;
uint64_t aio_num_write = 0;
uint64_t aio_bytes_written = 0;
-static void aio_move(AIO_Reqs *req);
-
/*
* Stats
*/
@@ -156,10 +160,12 @@ ink_aio_init(ModuleVersion v)
RecRegisterRawStat(aio_rsb, RECT_PROCESS,
"proxy.process.cache.KB_write_per_sec",
RECD_FLOAT, RECP_NULL, (int) AIO_STAT_KB_WRITE_PER_SEC, aio_stats_cb);
+#if AIO_MODE != AIO_MODE_NATIVE
memset(&aio_reqs, 0, MAX_DISKS_POSSIBLE * sizeof(AIO_Reqs *));
ink_mutex_init(&insert_mutex, NULL);
REC_ReadConfigInteger(cache_config_threads_per_disk, "proxy.config.cache.threads_per_disk");
+#endif
}
int
@@ -172,6 +178,7 @@ ink_aio_start()
return 0;
}
+#if AIO_MODE != AIO_MODE_NATIVE
static void *aio_thread_main(void *arg);
@@ -534,3 +541,132 @@ aio_thread_main(void *arg)
}
return 0;
}
+#else
+int
+DiskHandler::startAIOEvent(int event, Event *e) {
+ SET_HANDLER(&DiskHandler::mainAIOEvent);
+ e->schedule_every(AIO_PERIOD);
+ trigger_event = e;
+ return EVENT_CONT;
+}
+
+int
+DiskHandler::mainAIOEvent(int event, Event *e) {
+ AIOCallback *op = NULL;
+Lagain:
+ int ret = io_getevents(ctx, 0, MAX_AIO_EVENTS, events, NULL);
+ //printf("%d\n", ret);
+ for (int i = 0; i < ret; i++) {
+ op = (AIOCallback *) events[i].data;
+ op->aio_result = events[i].res;
+ ink_assert(op->action.continuation);
+ complete_list.enqueue(op);
+ //op->handleEvent(event, e);
+ }
+ if (ret == MAX_AIO_EVENTS)
+ goto Lagain;
+ if (ret < 0)
+ perror("io_getevents");
+
+ ink_aiocb_t *cbs[MAX_AIO_EVENTS];
+ int num = 0;
+ for (; num < MAX_AIO_EVENTS && ((op = ready_list.dequeue()) != NULL); ++num) {
+ cbs[num] = &op->aiocb;
+ ink_debug_assert(op->action.continuation);
+ }
+ if (num > 0) {
+ int ret;
+ do {
+ ret = io_submit(ctx, num, cbs);
+ } while (ret < 0 && errno == EAGAIN);
+
+ if (ret != num) {
+ if (ret < 0)
+ perror("io_submit error");
+ else {
+ fprintf(stderr, "could not sumbit IOs");
+ ink_debug_assert(0);
+ }
+ }
+ }
+
+ while ((op = complete_list.dequeue()) != NULL) {
+ op->handleEvent(event, e);
+ }
+ return EVENT_CONT;
+}
+
+int
+ink_aio_read(AIOCallback *op, int fromAPI) {
+ op->aiocb.aio_reqprio = AIO_DEFAULT_PRIORITY;
+ op->aiocb.aio_lio_opcode = IOCB_CMD_PREAD;
+ op->aiocb.aio_data = op;
+ this_ethread()->diskHandler->ready_list.enqueue(op);
+
+ return 1;
+}
+
+int
+ink_aio_write(AIOCallback *op, int fromAPI) {
+ op->aiocb.aio_reqprio = AIO_DEFAULT_PRIORITY;
+ op->aiocb.aio_lio_opcode = IOCB_CMD_PWRITE;
+ op->aiocb.aio_data = op;
+ this_ethread()->diskHandler->ready_list.enqueue(op);
+
+ return 1;
+}
+
+int
+ink_aio_readv(AIOCallback *op, int fromAPI) {
+ DiskHandler *dh = this_ethread()->diskHandler;
+ AIOCallback *io = op;
+ int sz = 0;
+
+ while (io) {
+ io->aiocb.aio_reqprio = AIO_DEFAULT_PRIORITY;
+ io->aiocb.aio_lio_opcode = IOCB_CMD_PREAD;
+ io->aiocb.aio_data = io;
+ dh->ready_list.enqueue(io);
+ ++sz;
+ io = io->then;
+ }
+
+ if (sz > 1) {
+ ink_debug_assert(op->action.continuation);
+ AIOVec *vec = new AIOVec(sz, op->action.continuation);
+ vec->action = op->action.continuation;
+ while (--sz >= 0) {
+ op->action = vec;
+ op = op->then;
+ }
+ }
+ return 1;
+}
+
+int
+ink_aio_writev(AIOCallback *op, int fromAPI) {
+ DiskHandler *dh = this_ethread()->diskHandler;
+ AIOCallback *io = op;
+ int sz = 0;
+
+ while (io) {
+ io->aiocb.aio_reqprio = AIO_DEFAULT_PRIORITY;
+ io->aiocb.aio_lio_opcode = IOCB_CMD_PWRITE;
+ io->aiocb.aio_data = io;
+ dh->ready_list.enqueue(io);
+ ++sz;
+ io = io->then;
+ }
+
+ if (sz > 1) {
+ ink_debug_assert(op->action.continuation);
+ AIOVec *vec = new AIOVec(sz, op->action.continuation);
+ vec->action = op->action.continuation;
+ while (--sz >= 0) {
+ op->action = vec;
+ op = op->then;
+ }
+ }
+ return 1;
+}
+#endif // AIO_MODE != AIO_MODE_NATIVE
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c95298df/iocore/aio/I_AIO.h
----------------------------------------------------------------------
diff --git a/iocore/aio/I_AIO.h b/iocore/aio/I_AIO.h
index 8d4cd2b..ccdc078 100644
--- a/iocore/aio/I_AIO.h
+++ b/iocore/aio/I_AIO.h
@@ -46,8 +46,114 @@
#define AIO_MODE_AIO 0
#define AIO_MODE_SYNC 1
#define AIO_MODE_THREAD 2
+#define AIO_MODE_NATIVE 3
+#if use_linux_native_aio
+#define AIO_MODE AIO_MODE_NATIVE
+#else
#define AIO_MODE AIO_MODE_THREAD
+#endif
+
+#if AIO_MODE == AIO_MODE_NATIVE
+
+#include <sys/syscall.h> /* for __NR_* definitions */
+#include <linux/aio_abi.h> /* for AIO types and constants */
+#define MAX_AIO_EVENTS 1024
+
+#if defined(__LITTLE_ENDIAN)
+#if (SIZEOF_VOID_POINTER == 4)
+#define PADDEDPtr(x, y) x; unsigned y
+#define PADDEDul(x, y) unsigned long x; unsigned y
+#elif (SIZEOF_VOID_POINTER == 8)
+#define PADDEDPtr(x, y) x
+#define PADDEDul(x, y) unsigned long x
+#endif
+#elif defined(__BIG_ENDIAN)
+#if (SIZEOF_VOID_POINTER == 4)
+#define PADDEDPtr(x, y) unsigned y; x
+#define PADDEDul(x, y) unsigned y; unsigned long y
+#elif (SIZEOF_VOID_POINTER == 8)
+#define PADDEDPtr(x, y) x
+#define PADDEDul(x, y) unsigned long x
+#endif
+#else
+#error edit for your odd byteorder.
+#endif
+
+typedef struct ink_iocb {
+ /* these are internal to the kernel/libc. */
+ PADDEDPtr(void *aio_data, _pad1); /* data to be returned in event's data */
+ unsigned PADDED(aio_key, aio_reserved1);
+ /* the kernel sets aio_key to the req # */
+
+ /* common fields */
+ short aio_lio_opcode; /* see IOCB_CMD_ above */
+ short aio_reqprio;
+ int aio_fildes;
+
+ PADDEDPtr(void *aio_buf, _pad2);
+ PADDEDul(aio_nbytes, _pad3);
+ int64_t aio_offset;
+
+ /* extra parameters */
+ uint64_t aio_reserved2; /* TODO: use this for a (struct sigevent *) */
+
+ /* flags for the "struct iocb" */
+ int aio_flags;
+
+ /*
+ * if the IOCB_FLAG_RESFD flag of "aio_flags" is set, this is an
+ * eventfd to signal AIO readiness to
+ */
+ int aio_resfd;
+
+} ink_aiocb_t;
+
+typedef struct ink_io_event {
+ PADDEDPtr(void *data, _pad1); /* the data field from the iocb */
+ PADDEDPtr(ink_aiocb_t *obj, _pad2); /* what iocb this event came from */
+ PADDEDul(res, _pad3); /* result code for this event */
+ PADDEDul(res2, _pad4); /* secondary result */
+} ink_io_event_t;
+
+TS_INLINE int io_setup(unsigned nr, aio_context_t *ctxp)
+{
+ return syscall(__NR_io_setup, nr, ctxp);
+}
+
+TS_INLINE int io_destroy(aio_context_t ctx)
+{
+ return syscall(__NR_io_destroy, ctx);
+}
+
+TS_INLINE int io_submit(aio_context_t ctx, long nr, ink_aiocb_t **iocbpp)
+{
+ return syscall(__NR_io_submit, ctx, nr, iocbpp);
+}
+
+TS_INLINE int io_getevents(aio_context_t ctx, long min_nr, long max_nr,
+ ink_io_event_t *events, struct timespec *timeout)
+{
+ return syscall(__NR_io_getevents, ctx, min_nr, max_nr, events, timeout);
+}
+
+struct AIOVec: public Continuation
+{
+ Action action;
+ int size;
+ int completed;
+
+ AIOVec(int sz, Continuation *c): Continuation(new_ProxyMutex()), size(sz), completed(0)
+ {
+ action = c;
+ SET_HANDLER(&AIOVec::mainEvent);
+ }
+ int mainEvent(int event, Event *e);
+};
+#else
+typedef ink_aiocb ink_aiocb_t;
+bool ink_aio_thread_num_set(int thread_num);
+#endif
// AIOCallback::thread special values
#define AIO_CALLBACK_THREAD_ANY ((EThread*)0) // any regular event thread
#define AIO_CALLBACK_THREAD_AIO ((EThread*)-1)
@@ -71,12 +177,33 @@ struct AIOCallback: public Continuation
}
};
+#if AIO_MODE == AIO_MODE_NATIVE
+struct DiskHandler: public Continuation
+{
+ Event *trigger_event;
+ aio_context_t ctx;
+ ink_io_event_t events[MAX_AIO_EVENTS];
+ Que(AIOCallback, link) ready_list;
+ Que(AIOCallback, link) complete_list;
+ int startAIOEvent(int event, Event *e);
+ int mainAIOEvent(int event, Event *e);
+ DiskHandler() {
+ SET_HANDLER(&DiskHandler::startAIOEvent);
+ memset(&ctx, 0, sizeof(aio_context_t));
+ int ret = io_setup(MAX_AIO_EVENTS, &ctx);
+ if (ret < 0) {
+ perror("io_setup error");
+ }
+ }
+};
+#endif
void ink_aio_init(ModuleVersion version);
int ink_aio_start();
void ink_aio_set_callback(Continuation * error_callback);
int ink_aio_read(AIOCallback *op, int fromAPI = 0); // fromAPI is a boolean to indicate if this is from a API call such as upload proxy feature
int ink_aio_write(AIOCallback *op, int fromAPI = 0);
-bool ink_aio_thread_num_set(int thread_num);
+int ink_aio_readv(AIOCallback *op, int fromAPI = 0); // fromAPI is a boolean to indicate if this is from a API call such as upload proxy feature
+int ink_aio_writev(AIOCallback *op, int fromAPI = 0);
AIOCallback *new_AIOCallback(void);
#endif
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c95298df/iocore/aio/P_AIO.h
----------------------------------------------------------------------
diff --git a/iocore/aio/P_AIO.h b/iocore/aio/P_AIO.h
index 2d686e3..9e3321c 100644
--- a/iocore/aio/P_AIO.h
+++ b/iocore/aio/P_AIO.h
@@ -41,6 +41,58 @@
#define AIO_MODULE_VERSION makeModuleVersion(AIO_MODULE_MAJOR_VERSION,\
AIO_MODULE_MINOR_VERSION,\
PRIVATE_MODULE_HEADER)
+
+TS_INLINE int
+AIOCallback::ok()
+{
+ return (off_t) aiocb.aio_nbytes == (off_t) aio_result;
+}
+
+#if AIO_MODE == AIO_MODE_NATIVE
+
+extern Continuation *aio_err_callbck;
+
+struct AIOCallbackInternal: public AIOCallback
+{
+ int io_complete(int event, void *data);
+ AIOCallbackInternal()
+ {
+ memset ((char *) &(this->aiocb), 0, sizeof(this->aiocb));
+ SET_HANDLER(&AIOCallbackInternal::io_complete);
+ }
+};
+
+TS_INLINE int
+AIOCallbackInternal::io_complete(int event, void *data)
+{
+ (void) event;
+ (void) data;
+
+ if (!ok() && aio_err_callbck)
+ eventProcessor.schedule_imm(aio_err_callbck, ET_CALL, AIO_EVENT_DONE);
+ mutex = action.mutex;
+ MUTEX_LOCK(lock, mutex, this_ethread());
+ if (!action.cancelled)
+ action.continuation->handleEvent(AIO_EVENT_DONE, this);
+ return EVENT_DONE;
+}
+
+TS_INLINE int
+AIOVec::mainEvent(int event, Event *e) {
+ ++completed;
+ if (completed < size)
+ return EVENT_CONT;
+ else if (completed == size) {
+ MUTEX_LOCK(lock, action.mutex, this_ethread());
+ if (!action.cancelled)
+ action.continuation->handleEvent(AIO_EVENT_DONE, this);
+ delete this;
+ return EVENT_DONE;
+ }
+ ink_assert(!"AIOVec mainEvent err");
+ return EVENT_ERROR;
+}
+#else
struct AIO_Reqs;
struct AIOCallbackInternal: public AIOCallback
@@ -61,12 +113,6 @@ struct AIOCallbackInternal: public AIOCallback
};
TS_INLINE int
-AIOCallback::ok()
-{
- return (off_t) aiocb.aio_nbytes == (off_t) aio_result;
-}
-
-TS_INLINE int
AIOCallbackInternal::io_complete(int event, void *data)
{
(void) event;
@@ -92,6 +138,7 @@ struct AIO_Reqs
volatile int requests_queued;
};
+#endif // AIO_MODE == AIO_MODE_NATIVE
#ifdef AIO_STATS
class AIOTestData:public Continuation
{
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c95298df/iocore/cache/Cache.cc
----------------------------------------------------------------------
diff --git a/iocore/cache/Cache.cc b/iocore/cache/Cache.cc
index 1232a11..d5ba29f 100644
--- a/iocore/cache/Cache.cc
+++ b/iocore/cache/Cache.cc
@@ -146,6 +146,51 @@ struct VolInitInfo
}
};
+#if AIO_MODE == AIO_MODE_NATIVE
+struct VolInit : public Continuation
+{
+ Vol *vol;
+ char *path;
+ off_t blocks;
+ int64_t offset;
+ bool vol_clear;
+
+ int mainEvent(int event, Event *e) {
+ vol->init(path, blocks, offset, vol_clear);
+ mutex.clear();
+ delete this;
+ return EVENT_DONE;
+ }
+
+ VolInit(Vol *v, char *p, off_t b, int64_t o, bool c) : Continuation(v->mutex),
+ vol(v), path(p), blocks(b), offset(o), vol_clear(c) {
+ SET_HANDLER(&VolInit::mainEvent);
+ }
+};
+
+struct DiskInit : public Continuation
+{
+ CacheDisk *disk;
+ char *s;
+ off_t blocks;
+ off_t askip;
+ int ahw_sector_size;
+ int fildes;
+ bool clear;
+
+ int mainEvent(int event, Event *e) {
+ disk->open(s, blocks, askip, ahw_sector_size, fildes, clear);
+ mutex.clear();
+ delete this;
+ return EVENT_DONE;
+ }
+
+ DiskInit(CacheDisk *d, char *str, off_t b, off_t skip, int sector, int f, bool c) : Continuation(d->mutex),
+ disk(d), s(str), blocks(b), askip(skip), ahw_sector_size(sector), fildes(f), clear(c) {
+ SET_HANDLER(&DiskInit::mainEvent);
+ }
+};
+#endif
void cplist_init();
static void cplist_update();
int cplist_reconfigure();
@@ -530,6 +575,16 @@ CacheProcessor::start_internal(int flags)
verify_cache_api();
#endif
+#if AIO_MODE == AIO_MODE_NATIVE
+ int etype = ET_NET;
+ int n_netthreads = eventProcessor.n_threads_for_type[etype];
+ EThread **netthreads = eventProcessor.eventthread[etype];
+ for (int i = 0; i < n_netthreads; ++i) {
+ netthreads[i]->diskHandler = new DiskHandler();
+ netthreads[i]->schedule_imm(netthreads[i]->diskHandler);
+ }
+#endif
+
start_internal_flags = flags;
clear = !!(flags & PROCESSOR_RECONFIGURE) || auto_clear_flag;
fix = !!(flags & PROCESSOR_FIX);
@@ -593,7 +648,11 @@ CacheProcessor::start_internal(int flags)
}
off_t skip = ROUND_TO_STORE_BLOCK((sd->offset < START_POS ? START_POS + sd->alignment : sd->offset));
blocks = blocks - ROUND_TO_STORE_BLOCK(sd->offset + skip);
+#if AIO_MODE == AIO_MODE_NATIVE
+ eventProcessor.schedule_imm(NEW(new DiskInit(gdisks[gndisks], path, blocks, skip, sector_size, fd, clear)));
+#else
gdisks[gndisks]->open(path, blocks, skip, sector_size, fd, clear);
+#endif
gndisks++;
}
} else {
@@ -1109,8 +1168,11 @@ Vol::init(char *s, off_t blocks, off_t dir_skip, bool clear)
aio->thread = AIO_CALLBACK_THREAD_ANY;
aio->then = (i < 3) ? &(init_info->vol_aio[i + 1]) : 0;
}
-
+#if AIO_MODE == AIO_MODE_NATIVE
+ ink_assert(ink_aio_readv(init_info->vol_aio));
+#else
ink_assert(ink_aio_read(init_info->vol_aio));
+#endif
return 0;
}
@@ -1440,7 +1502,11 @@ Ldone:{
init_info->vol_aio[2].aiocb.aio_offset = ss + dirlen - footerlen;
SET_HANDLER(&Vol::handle_recover_write_dir);
+#if AIO_MODE == AIO_MODE_NATIVE
+ ink_assert(ink_aio_writev(init_info->vol_aio));
+#else
ink_assert(ink_aio_write(init_info->vol_aio));
+#endif
return EVENT_CONT;
}
@@ -1812,7 +1878,11 @@ Cache::open(bool clear, bool fix) {
blocks = q->b->len;
bool vol_clear = clear || d->cleared || q->new_block;
+#if AIO_MODE == AIO_MODE_NATIVE
+ eventProcessor.schedule_imm(NEW(new VolInit(cp->vols[vol_no], d->path, blocks, q->b->offset, vol_clear)));
+#else
cp->vols[vol_no]->init(d->path, blocks, q->b->offset, vol_clear);
+#endif
vol_no++;
cache_size += blocks;
}
@@ -1926,7 +1996,7 @@ CacheVC::handleReadDone(int event, Event *e) {
if (checksum != doc->checksum) {
Note("cache: checksum error for [%" PRIu64 " %" PRIu64 "] len %d, hlen %d, disk %s, offset %" PRIu64 " size %zu",
doc->first_key.b[0], doc->first_key.b[1],
- doc->len, doc->hlen, vol->path, io.aiocb.aio_offset, io.aiocb.aio_nbytes);
+ doc->len, doc->hlen, vol->path, io.aiocb.aio_offset, (size_t)io.aiocb.aio_nbytes);
doc->magic = DOC_CORRUPT;
okay = 0;
}
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c95298df/iocore/cache/CacheVol.cc
----------------------------------------------------------------------
diff --git a/iocore/cache/CacheVol.cc b/iocore/cache/CacheVol.cc
index bf4fad9..ccd2009 100644
--- a/iocore/cache/CacheVol.cc
+++ b/iocore/cache/CacheVol.cc
@@ -365,7 +365,7 @@ Lread:
offset = 0;
ink_assert(ink_aio_read(&io) >= 0);
Debug("cache_scan_truss", "read %p:scanObject %" PRId64 " %zu", this,
- (int64_t)io.aiocb.aio_offset, io.aiocb.aio_nbytes);
+ (int64_t)io.aiocb.aio_offset, (size_t)io.aiocb.aio_nbytes);
return EVENT_CONT;
Ldone:
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c95298df/lib/ts/ink_aiocb.h
----------------------------------------------------------------------
diff --git a/lib/ts/ink_aiocb.h b/lib/ts/ink_aiocb.h
index 1cf9d38..543243b 100644
--- a/lib/ts/ink_aiocb.h
+++ b/lib/ts/ink_aiocb.h
@@ -39,7 +39,7 @@
#define LIO_READ 0x1
#define LIO_WRITE 0x2
-typedef struct ink_aiocb
+struct ink_aiocb
{
int aio_fildes;
#if defined(__STDC__)
@@ -58,6 +58,6 @@ typedef struct ink_aiocb
// aio_result_t aio_resultp; /* results */
int aio_state; /* state flag for List I/O */
int aio__pad[1]; /* extension padding */
-} ink_aiocb_t;
+};
#endif
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c95298df/lib/ts/ink_config.h.in
----------------------------------------------------------------------
diff --git a/lib/ts/ink_config.h.in b/lib/ts/ink_config.h.in
index 6b0525c..93e393c 100644
--- a/lib/ts/ink_config.h.in
+++ b/lib/ts/ink_config.h.in
@@ -122,6 +122,7 @@
#define TS_USE_RECLAIMABLE_FREELIST @use_reclaimable_freelist@
#define TS_USE_TLS_NPN @use_tls_npn@
#define TS_USE_TLS_SNI @use_tls_sni@
+#define TS_USE_LINUX_NATIVE_AIO @use_linux_native_aio@
/* OS API definitions */
#define GETHOSTBYNAME_R_HOSTENT_DATA @gethostbyname_r_hostent_data@
http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c95298df/proxy/InkAPI.cc
----------------------------------------------------------------------
diff --git a/proxy/InkAPI.cc b/proxy/InkAPI.cc
index 661a2d0..fd8258e 100644
--- a/proxy/InkAPI.cc
+++ b/proxy/InkAPI.cc
@@ -7324,10 +7324,14 @@ TSAIOWrite(int fd, off_t offset, char* buf, const size_t bufSize, TSCont contp)
TSReturnCode
TSAIOThreadNumSet(int thread_num)
{
+#if AIO_MODE == AIO_MODE_NATIVE
+ return TS_SUCCESS;
+#else
if (ink_aio_thread_num_set(thread_num))
return TS_SUCCESS;
return TS_ERROR;
+#endif
}
void
Re: git commit: TS-1760: Option to use Linux native AIO
Posted by Igor Galić <i....@brainsware.org>.
----- Original Message -----
> Updated Branches:
> refs/heads/master 84df57e0e -> c95298dfc
>
>
> TS-1760: Option to use Linux native AIO
>
> to enable the Linux Native AIO, be sure to check Linux kernel AIO
> supporting and use '--use_linux_native_aio' configure directive.
>
> in the Linux Native AIO, all the IO is managed by system, so
> proxy.config.cache.threads_per_disk have no meaning anymore.
>
>
> Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo
> Commit:
> http://git-wip-us.apache.org/repos/asf/trafficserver/commit/c95298df
> Tree:
> http://git-wip-us.apache.org/repos/asf/trafficserver/tree/c95298df
> Diff:
> http://git-wip-us.apache.org/repos/asf/trafficserver/diff/c95298df
>
> Branch: refs/heads/master
> Commit: c95298dfcc6336cc04f172713f5d41e023d19d51
> Parents: 84df57e
> Author: weijin <ta...@taobao.com>
> Authored: Wed Apr 3 14:59:48 2013 +0800
> Committer: Zhao Yongming <mi...@gmail.com>
> Committed: Wed Apr 3 15:34:53 2013 +0800
>
> ----------------------------------------------------------------------
> configure.ac | 19 +++++
> iocore/aio/AIO.cc | 146
> +++++++++++++++++++++++++++++++++++++++--
> iocore/aio/I_AIO.h | 129
> ++++++++++++++++++++++++++++++++++++-
> iocore/aio/P_AIO.h | 59 +++++++++++++++--
> iocore/cache/Cache.cc | 74 ++++++++++++++++++++-
> iocore/cache/CacheVol.cc | 2 +-
> lib/ts/ink_aiocb.h | 4 +-
> lib/ts/ink_config.h.in | 1 +
> proxy/InkAPI.cc | 4 +
> 9 files changed, 421 insertions(+), 17 deletions(-)
> ----------------------------------------------------------------------
>
>
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c95298df/configure.ac
> ----------------------------------------------------------------------
> diff --git a/configure.ac b/configure.ac
> index 7b30f26..72e60b0 100644
> --- a/configure.ac
> +++ b/configure.ac
> @@ -398,6 +398,25 @@ TS_ARG_ENABLE_VAR([use], [reclaimable_freelist])
> AC_SUBST(use_reclaimable_freelist)
>
> #
> +# If the OS is linux, we can use '--use_linux_native_aio' option to
> +# replace the aio thread mode. Effective only on the linux system.
> +#
> +
> +if test "x${host_os_def}" = "xlinux"; then
> + AC_MSG_CHECKING([whether to use native aio or not])
> + AC_ARG_ENABLE([linux_native_aio],
> + [AS_HELP_STRING([--enable-linux-native-aio],
> + [turn on linux native aio, only effective on linux system])],
> + [],
> + [enable_linux_native_aio="yes"])
> + AC_MSG_RESULT([$enable_linux_native_aio])
> +else
> + enable_linux_native_aio="no"
> +fi
> +TS_ARG_ENABLE_VAR([use], [linux_native_aio])
> +AC_SUBST(use_linux_native_aio)
> +
> +
> # Configure how many stats to allocate for plugins. Default is 512.
> #
> AC_ARG_WITH([max-api-stats],
>
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c95298df/iocore/aio/AIO.cc
> ----------------------------------------------------------------------
> diff --git a/iocore/aio/AIO.cc b/iocore/aio/AIO.cc
> index 3086f0c..40bf1b1 100644
> --- a/iocore/aio/AIO.cc
> +++ b/iocore/aio/AIO.cc
> @@ -27,6 +27,10 @@
>
> #include "P_AIO.h"
>
> +#if AIO_MODE == AIO_MODE_NATIVE
> +#define AIO_PERIOD
> -HRTIME_MSECONDS(4)
> +#else
> +
> #define MAX_DISKS_POSSIBLE 100
>
> // globals
> @@ -36,24 +40,24 @@ int ts_config_with_inkdiskio = 0;
> AIO_Reqs *aio_reqs[MAX_DISKS_POSSIBLE];
> /* number of unique file descriptors in the aio_reqs array */
> volatile int num_filedes = 1;
> -RecRawStatBlock *aio_rsb = NULL;
> +
> // acquire this mutex before inserting a new entry in the aio_reqs
> array.
> // Don't need to acquire this for searching the array
> static ink_mutex insert_mutex;
> -Continuation *aio_err_callbck = 0;
> +
> RecInt cache_config_threads_per_disk = 12;
> RecInt api_config_threads_per_disk = 12;
> int thread_is_created = 0;
> +#endif // AIO_MODE == AIO_MODE_NATIVE
>
> -
> +RecRawStatBlock *aio_rsb = NULL;
> +Continuation *aio_err_callbck = 0;
> // AIO Stats
> uint64_t aio_num_read = 0;
> uint64_t aio_bytes_read = 0;
> uint64_t aio_num_write = 0;
> uint64_t aio_bytes_written = 0;
>
> -static void aio_move(AIO_Reqs *req);
> -
> /*
> * Stats
> */
> @@ -156,10 +160,12 @@ ink_aio_init(ModuleVersion v)
> RecRegisterRawStat(aio_rsb, RECT_PROCESS,
> "proxy.process.cache.KB_write_per_sec",
> RECD_FLOAT, RECP_NULL, (int)
> AIO_STAT_KB_WRITE_PER_SEC, aio_stats_cb);
> +#if AIO_MODE != AIO_MODE_NATIVE
> memset(&aio_reqs, 0, MAX_DISKS_POSSIBLE * sizeof(AIO_Reqs *));
> ink_mutex_init(&insert_mutex, NULL);
>
> REC_ReadConfigInteger(cache_config_threads_per_disk,
> "proxy.config.cache.threads_per_disk");
> +#endif
> }
>
> int
> @@ -172,6 +178,7 @@ ink_aio_start()
> return 0;
> }
>
> +#if AIO_MODE != AIO_MODE_NATIVE
>
> static void *aio_thread_main(void *arg);
>
> @@ -534,3 +541,132 @@ aio_thread_main(void *arg)
> }
> return 0;
> }
> +#else
> +int
> +DiskHandler::startAIOEvent(int event, Event *e) {
> + SET_HANDLER(&DiskHandler::mainAIOEvent);
> + e->schedule_every(AIO_PERIOD);
> + trigger_event = e;
> + return EVENT_CONT;
> +}
> +
> +int
> +DiskHandler::mainAIOEvent(int event, Event *e) {
> + AIOCallback *op = NULL;
> +Lagain:
> + int ret = io_getevents(ctx, 0, MAX_AIO_EVENTS, events, NULL);
> + //printf("%d\n", ret);
> + for (int i = 0; i < ret; i++) {
> + op = (AIOCallback *) events[i].data;
> + op->aio_result = events[i].res;
> + ink_assert(op->action.continuation);
> + complete_list.enqueue(op);
> + //op->handleEvent(event, e);
> + }
> + if (ret == MAX_AIO_EVENTS)
> + goto Lagain;
Why is this not a simple do { } while (ret == MAX_AIO_EVENTS); ?
> + if (ret < 0)
> + perror("io_getevents");
> +
> + ink_aiocb_t *cbs[MAX_AIO_EVENTS];
> + int num = 0;
> + for (; num < MAX_AIO_EVENTS && ((op = ready_list.dequeue()) !=
> NULL); ++num) {
> + cbs[num] = &op->aiocb;
> + ink_debug_assert(op->action.continuation);
> + }
> + if (num > 0) {
> + int ret;
> + do {
> + ret = io_submit(ctx, num, cbs);
> + } while (ret < 0 && errno == EAGAIN);
> +
> + if (ret != num) {
> + if (ret < 0)
> + perror("io_submit error");
> + else {
> + fprintf(stderr, "could not sumbit IOs");
> + ink_debug_assert(0);
> + }
> + }
> + }
> +
> + while ((op = complete_list.dequeue()) != NULL) {
> + op->handleEvent(event, e);
> + }
> + return EVENT_CONT;
> +}
> +
> +int
> +ink_aio_read(AIOCallback *op, int fromAPI) {
> + op->aiocb.aio_reqprio = AIO_DEFAULT_PRIORITY;
> + op->aiocb.aio_lio_opcode = IOCB_CMD_PREAD;
> + op->aiocb.aio_data = op;
> + this_ethread()->diskHandler->ready_list.enqueue(op);
> +
> + return 1;
> +}
> +
> +int
> +ink_aio_write(AIOCallback *op, int fromAPI) {
> + op->aiocb.aio_reqprio = AIO_DEFAULT_PRIORITY;
> + op->aiocb.aio_lio_opcode = IOCB_CMD_PWRITE;
> + op->aiocb.aio_data = op;
> + this_ethread()->diskHandler->ready_list.enqueue(op);
> +
> + return 1;
> +}
> +
> +int
> +ink_aio_readv(AIOCallback *op, int fromAPI) {
> + DiskHandler *dh = this_ethread()->diskHandler;
> + AIOCallback *io = op;
> + int sz = 0;
> +
> + while (io) {
> + io->aiocb.aio_reqprio = AIO_DEFAULT_PRIORITY;
> + io->aiocb.aio_lio_opcode = IOCB_CMD_PREAD;
> + io->aiocb.aio_data = io;
> + dh->ready_list.enqueue(io);
> + ++sz;
> + io = io->then;
> + }
> +
> + if (sz > 1) {
> + ink_debug_assert(op->action.continuation);
> + AIOVec *vec = new AIOVec(sz, op->action.continuation);
> + vec->action = op->action.continuation;
> + while (--sz >= 0) {
> + op->action = vec;
> + op = op->then;
> + }
> + }
> + return 1;
> +}
> +
> +int
> +ink_aio_writev(AIOCallback *op, int fromAPI) {
> + DiskHandler *dh = this_ethread()->diskHandler;
> + AIOCallback *io = op;
> + int sz = 0;
> +
> + while (io) {
> + io->aiocb.aio_reqprio = AIO_DEFAULT_PRIORITY;
> + io->aiocb.aio_lio_opcode = IOCB_CMD_PWRITE;
> + io->aiocb.aio_data = io;
> + dh->ready_list.enqueue(io);
> + ++sz;
> + io = io->then;
> + }
> +
> + if (sz > 1) {
> + ink_debug_assert(op->action.continuation);
> + AIOVec *vec = new AIOVec(sz, op->action.continuation);
> + vec->action = op->action.continuation;
> + while (--sz >= 0) {
> + op->action = vec;
> + op = op->then;
> + }
> + }
> + return 1;
> +}
> +#endif // AIO_MODE != AIO_MODE_NATIVE
>
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c95298df/iocore/aio/I_AIO.h
> ----------------------------------------------------------------------
> diff --git a/iocore/aio/I_AIO.h b/iocore/aio/I_AIO.h
> index 8d4cd2b..ccdc078 100644
> --- a/iocore/aio/I_AIO.h
> +++ b/iocore/aio/I_AIO.h
> @@ -46,8 +46,114 @@
> #define AIO_MODE_AIO 0
> #define AIO_MODE_SYNC 1
> #define AIO_MODE_THREAD 2
> +#define AIO_MODE_NATIVE 3
> +#if use_linux_native_aio
> +#define AIO_MODE AIO_MODE_NATIVE
> +#else
> #define AIO_MODE AIO_MODE_THREAD
> +#endif
> +
> +#if AIO_MODE == AIO_MODE_NATIVE
> +
> +#include <sys/syscall.h> /* for __NR_* definitions */
> +#include <linux/aio_abi.h> /* for AIO types and constants */
> +#define MAX_AIO_EVENTS 1024
> +
> +#if defined(__LITTLE_ENDIAN)
> +#if (SIZEOF_VOID_POINTER == 4)
> +#define PADDEDPtr(x, y) x; unsigned y
> +#define PADDEDul(x, y) unsigned long x; unsigned y
> +#elif (SIZEOF_VOID_POINTER == 8)
> +#define PADDEDPtr(x, y) x
> +#define PADDEDul(x, y) unsigned long x
> +#endif
> +#elif defined(__BIG_ENDIAN)
> +#if (SIZEOF_VOID_POINTER == 4)
> +#define PADDEDPtr(x, y) unsigned y; x
> +#define PADDEDul(x, y) unsigned y; unsigned long y
> +#elif (SIZEOF_VOID_POINTER == 8)
> +#define PADDEDPtr(x, y) x
> +#define PADDEDul(x, y) unsigned long x
> +#endif
> +#else
> +#error edit for your odd byteorder.
> +#endif
> +
> +typedef struct ink_iocb {
> + /* these are internal to the kernel/libc. */
> + PADDEDPtr(void *aio_data, _pad1); /* data to be returned in
> event's data */
> + unsigned PADDED(aio_key, aio_reserved1);
> + /* the kernel sets aio_key to the req # */
> +
> + /* common fields */
> + short aio_lio_opcode; /* see IOCB_CMD_ above */
> + short aio_reqprio;
> + int aio_fildes;
> +
> + PADDEDPtr(void *aio_buf, _pad2);
> + PADDEDul(aio_nbytes, _pad3);
> + int64_t aio_offset;
> +
> + /* extra parameters */
> + uint64_t aio_reserved2; /* TODO: use this for a (struct sigevent
> *) */
> +
> + /* flags for the "struct iocb" */
> + int aio_flags;
> +
> + /*
> + * if the IOCB_FLAG_RESFD flag of "aio_flags" is set, this is an
> + * eventfd to signal AIO readiness to
> + */
> + int aio_resfd;
> +
> +} ink_aiocb_t;
> +
> +typedef struct ink_io_event {
> + PADDEDPtr(void *data, _pad1); /* the data field from the iocb */
> + PADDEDPtr(ink_aiocb_t *obj, _pad2); /* what iocb this event
> came from */
> + PADDEDul(res, _pad3); /* result code for this event */
> + PADDEDul(res2, _pad4); /* secondary result */
> +} ink_io_event_t;
> +
> +TS_INLINE int io_setup(unsigned nr, aio_context_t *ctxp)
> +{
> + return syscall(__NR_io_setup, nr, ctxp);
> +}
> +
> +TS_INLINE int io_destroy(aio_context_t ctx)
> +{
> + return syscall(__NR_io_destroy, ctx);
> +}
> +
> +TS_INLINE int io_submit(aio_context_t ctx, long nr, ink_aiocb_t
> **iocbpp)
> +{
> + return syscall(__NR_io_submit, ctx, nr, iocbpp);
> +}
> +
> +TS_INLINE int io_getevents(aio_context_t ctx, long min_nr, long
> max_nr,
> + ink_io_event_t *events, struct timespec *timeout)
> +{
> + return syscall(__NR_io_getevents, ctx, min_nr, max_nr, events,
> timeout);
> +}
> +
> +struct AIOVec: public Continuation
> +{
> + Action action;
> + int size;
> + int completed;
> +
> + AIOVec(int sz, Continuation *c): Continuation(new_ProxyMutex()),
> size(sz), completed(0)
> + {
> + action = c;
> + SET_HANDLER(&AIOVec::mainEvent);
> + }
>
> + int mainEvent(int event, Event *e);
> +};
> +#else
> +typedef ink_aiocb ink_aiocb_t;
> +bool ink_aio_thread_num_set(int thread_num);
> +#endif
> // AIOCallback::thread special values
> #define AIO_CALLBACK_THREAD_ANY ((EThread*)0) // any regular event
> thread
> #define AIO_CALLBACK_THREAD_AIO ((EThread*)-1)
> @@ -71,12 +177,33 @@ struct AIOCallback: public Continuation
> }
> };
>
> +#if AIO_MODE == AIO_MODE_NATIVE
> +struct DiskHandler: public Continuation
> +{
> + Event *trigger_event;
> + aio_context_t ctx;
> + ink_io_event_t events[MAX_AIO_EVENTS];
> + Que(AIOCallback, link) ready_list;
> + Que(AIOCallback, link) complete_list;
> + int startAIOEvent(int event, Event *e);
> + int mainAIOEvent(int event, Event *e);
> + DiskHandler() {
> + SET_HANDLER(&DiskHandler::startAIOEvent);
> + memset(&ctx, 0, sizeof(aio_context_t));
> + int ret = io_setup(MAX_AIO_EVENTS, &ctx);
> + if (ret < 0) {
> + perror("io_setup error");
> + }
> + }
> +};
> +#endif
> void ink_aio_init(ModuleVersion version);
> int ink_aio_start();
> void ink_aio_set_callback(Continuation * error_callback);
>
> int ink_aio_read(AIOCallback *op, int fromAPI = 0); // fromAPI is
> a boolean to indicate if this is from a API call such as upload
> proxy feature
> int ink_aio_write(AIOCallback *op, int fromAPI = 0);
> -bool ink_aio_thread_num_set(int thread_num);
> +int ink_aio_readv(AIOCallback *op, int fromAPI = 0); // fromAPI is
> a boolean to indicate if this is from a API call such as upload
> proxy feature
> +int ink_aio_writev(AIOCallback *op, int fromAPI = 0);
> AIOCallback *new_AIOCallback(void);
> #endif
>
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c95298df/iocore/aio/P_AIO.h
> ----------------------------------------------------------------------
> diff --git a/iocore/aio/P_AIO.h b/iocore/aio/P_AIO.h
> index 2d686e3..9e3321c 100644
> --- a/iocore/aio/P_AIO.h
> +++ b/iocore/aio/P_AIO.h
> @@ -41,6 +41,58 @@
> #define AIO_MODULE_VERSION
> makeModuleVersion(AIO_MODULE_MAJOR_VERSION,\
> AIO_MODULE_MINOR_VERSION,\
> PRIVATE_MODULE_HEADER)
> +
> +TS_INLINE int
> +AIOCallback::ok()
> +{
> + return (off_t) aiocb.aio_nbytes == (off_t) aio_result;
> +}
> +
> +#if AIO_MODE == AIO_MODE_NATIVE
> +
> +extern Continuation *aio_err_callbck;
> +
> +struct AIOCallbackInternal: public AIOCallback
> +{
> + int io_complete(int event, void *data);
> + AIOCallbackInternal()
> + {
> + memset ((char *) &(this->aiocb), 0, sizeof(this->aiocb));
> + SET_HANDLER(&AIOCallbackInternal::io_complete);
> + }
> +};
> +
> +TS_INLINE int
> +AIOCallbackInternal::io_complete(int event, void *data)
> +{
> + (void) event;
> + (void) data;
> +
> + if (!ok() && aio_err_callbck)
> + eventProcessor.schedule_imm(aio_err_callbck, ET_CALL,
> AIO_EVENT_DONE);
> + mutex = action.mutex;
> + MUTEX_LOCK(lock, mutex, this_ethread());
> + if (!action.cancelled)
> + action.continuation->handleEvent(AIO_EVENT_DONE, this);
> + return EVENT_DONE;
> +}
> +
> +TS_INLINE int
> +AIOVec::mainEvent(int event, Event *e) {
> + ++completed;
> + if (completed < size)
> + return EVENT_CONT;
> + else if (completed == size) {
> + MUTEX_LOCK(lock, action.mutex, this_ethread());
> + if (!action.cancelled)
> + action.continuation->handleEvent(AIO_EVENT_DONE, this);
> + delete this;
> + return EVENT_DONE;
> + }
> + ink_assert(!"AIOVec mainEvent err");
> + return EVENT_ERROR;
> +}
> +#else
> struct AIO_Reqs;
>
> struct AIOCallbackInternal: public AIOCallback
> @@ -61,12 +113,6 @@ struct AIOCallbackInternal: public AIOCallback
> };
>
> TS_INLINE int
> -AIOCallback::ok()
> -{
> - return (off_t) aiocb.aio_nbytes == (off_t) aio_result;
> -}
> -
> -TS_INLINE int
> AIOCallbackInternal::io_complete(int event, void *data)
> {
> (void) event;
> @@ -92,6 +138,7 @@ struct AIO_Reqs
> volatile int requests_queued;
> };
>
> +#endif // AIO_MODE == AIO_MODE_NATIVE
> #ifdef AIO_STATS
> class AIOTestData:public Continuation
> {
>
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c95298df/iocore/cache/Cache.cc
> ----------------------------------------------------------------------
> diff --git a/iocore/cache/Cache.cc b/iocore/cache/Cache.cc
> index 1232a11..d5ba29f 100644
> --- a/iocore/cache/Cache.cc
> +++ b/iocore/cache/Cache.cc
> @@ -146,6 +146,51 @@ struct VolInitInfo
> }
> };
>
> +#if AIO_MODE == AIO_MODE_NATIVE
> +struct VolInit : public Continuation
> +{
> + Vol *vol;
> + char *path;
> + off_t blocks;
> + int64_t offset;
> + bool vol_clear;
> +
> + int mainEvent(int event, Event *e) {
> + vol->init(path, blocks, offset, vol_clear);
> + mutex.clear();
> + delete this;
> + return EVENT_DONE;
> + }
> +
> + VolInit(Vol *v, char *p, off_t b, int64_t o, bool c) :
> Continuation(v->mutex),
> + vol(v), path(p), blocks(b), offset(o), vol_clear(c) {
> + SET_HANDLER(&VolInit::mainEvent);
> + }
> +};
> +
> +struct DiskInit : public Continuation
> +{
> + CacheDisk *disk;
> + char *s;
> + off_t blocks;
> + off_t askip;
> + int ahw_sector_size;
> + int fildes;
> + bool clear;
> +
> + int mainEvent(int event, Event *e) {
> + disk->open(s, blocks, askip, ahw_sector_size, fildes, clear);
> + mutex.clear();
> + delete this;
> + return EVENT_DONE;
> + }
> +
> + DiskInit(CacheDisk *d, char *str, off_t b, off_t skip, int sector,
> int f, bool c) : Continuation(d->mutex),
> + disk(d), s(str), blocks(b), askip(skip),
> ahw_sector_size(sector), fildes(f), clear(c) {
> + SET_HANDLER(&DiskInit::mainEvent);
> + }
> +};
> +#endif
> void cplist_init();
> static void cplist_update();
> int cplist_reconfigure();
> @@ -530,6 +575,16 @@ CacheProcessor::start_internal(int flags)
> verify_cache_api();
> #endif
>
> +#if AIO_MODE == AIO_MODE_NATIVE
> + int etype = ET_NET;
> + int n_netthreads = eventProcessor.n_threads_for_type[etype];
> + EThread **netthreads = eventProcessor.eventthread[etype];
> + for (int i = 0; i < n_netthreads; ++i) {
> + netthreads[i]->diskHandler = new DiskHandler();
> + netthreads[i]->schedule_imm(netthreads[i]->diskHandler);
> + }
> +#endif
> +
> start_internal_flags = flags;
> clear = !!(flags & PROCESSOR_RECONFIGURE) || auto_clear_flag;
> fix = !!(flags & PROCESSOR_FIX);
> @@ -593,7 +648,11 @@ CacheProcessor::start_internal(int flags)
> }
> off_t skip = ROUND_TO_STORE_BLOCK((sd->offset < START_POS ?
> START_POS + sd->alignment : sd->offset));
> blocks = blocks - ROUND_TO_STORE_BLOCK(sd->offset + skip);
> +#if AIO_MODE == AIO_MODE_NATIVE
> + eventProcessor.schedule_imm(NEW(new
> DiskInit(gdisks[gndisks], path, blocks, skip, sector_size, fd,
> clear)));
> +#else
> gdisks[gndisks]->open(path, blocks, skip, sector_size, fd,
> clear);
> +#endif
> gndisks++;
> }
> } else {
> @@ -1109,8 +1168,11 @@ Vol::init(char *s, off_t blocks, off_t
> dir_skip, bool clear)
> aio->thread = AIO_CALLBACK_THREAD_ANY;
> aio->then = (i < 3) ? &(init_info->vol_aio[i + 1]) : 0;
> }
> -
> +#if AIO_MODE == AIO_MODE_NATIVE
> + ink_assert(ink_aio_readv(init_info->vol_aio));
> +#else
> ink_assert(ink_aio_read(init_info->vol_aio));
> +#endif
> return 0;
> }
>
> @@ -1440,7 +1502,11 @@ Ldone:{
> init_info->vol_aio[2].aiocb.aio_offset = ss + dirlen -
> footerlen;
>
> SET_HANDLER(&Vol::handle_recover_write_dir);
> +#if AIO_MODE == AIO_MODE_NATIVE
> + ink_assert(ink_aio_writev(init_info->vol_aio));
> +#else
> ink_assert(ink_aio_write(init_info->vol_aio));
> +#endif
> return EVENT_CONT;
> }
>
> @@ -1812,7 +1878,11 @@ Cache::open(bool clear, bool fix) {
> blocks = q->b->len;
>
> bool vol_clear = clear || d->cleared || q->new_block;
> +#if AIO_MODE == AIO_MODE_NATIVE
> + eventProcessor.schedule_imm(NEW(new
> VolInit(cp->vols[vol_no], d->path, blocks, q->b->offset,
> vol_clear)));
> +#else
> cp->vols[vol_no]->init(d->path, blocks, q->b->offset,
> vol_clear);
> +#endif
> vol_no++;
> cache_size += blocks;
> }
> @@ -1926,7 +1996,7 @@ CacheVC::handleReadDone(int event, Event *e) {
> if (checksum != doc->checksum) {
> Note("cache: checksum error for [%" PRIu64 " %" PRIu64 "]
> len %d, hlen %d, disk %s, offset %" PRIu64 " size %zu",
> doc->first_key.b[0], doc->first_key.b[1],
> - doc->len, doc->hlen, vol->path, io.aiocb.aio_offset,
> io.aiocb.aio_nbytes);
> + doc->len, doc->hlen, vol->path, io.aiocb.aio_offset,
> (size_t)io.aiocb.aio_nbytes);
> doc->magic = DOC_CORRUPT;
> okay = 0;
> }
>
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c95298df/iocore/cache/CacheVol.cc
> ----------------------------------------------------------------------
> diff --git a/iocore/cache/CacheVol.cc b/iocore/cache/CacheVol.cc
> index bf4fad9..ccd2009 100644
> --- a/iocore/cache/CacheVol.cc
> +++ b/iocore/cache/CacheVol.cc
> @@ -365,7 +365,7 @@ Lread:
> offset = 0;
> ink_assert(ink_aio_read(&io) >= 0);
> Debug("cache_scan_truss", "read %p:scanObject %" PRId64 " %zu",
> this,
> - (int64_t)io.aiocb.aio_offset, io.aiocb.aio_nbytes);
> + (int64_t)io.aiocb.aio_offset, (size_t)io.aiocb.aio_nbytes);
> return EVENT_CONT;
>
> Ldone:
>
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c95298df/lib/ts/ink_aiocb.h
> ----------------------------------------------------------------------
> diff --git a/lib/ts/ink_aiocb.h b/lib/ts/ink_aiocb.h
> index 1cf9d38..543243b 100644
> --- a/lib/ts/ink_aiocb.h
> +++ b/lib/ts/ink_aiocb.h
> @@ -39,7 +39,7 @@
> #define LIO_READ 0x1
> #define LIO_WRITE 0x2
>
> -typedef struct ink_aiocb
> +struct ink_aiocb
> {
> int aio_fildes;
> #if defined(__STDC__)
> @@ -58,6 +58,6 @@ typedef struct ink_aiocb
> // aio_result_t aio_resultp; /* results */
> int aio_state; /* state flag for List I/O */
> int aio__pad[1]; /* extension padding */
> -} ink_aiocb_t;
> +};
>
> #endif
>
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c95298df/lib/ts/ink_config.h.in
> ----------------------------------------------------------------------
> diff --git a/lib/ts/ink_config.h.in b/lib/ts/ink_config.h.in
> index 6b0525c..93e393c 100644
> --- a/lib/ts/ink_config.h.in
> +++ b/lib/ts/ink_config.h.in
> @@ -122,6 +122,7 @@
> #define TS_USE_RECLAIMABLE_FREELIST @use_reclaimable_freelist@
> #define TS_USE_TLS_NPN @use_tls_npn@
> #define TS_USE_TLS_SNI @use_tls_sni@
> +#define TS_USE_LINUX_NATIVE_AIO @use_linux_native_aio@
>
> /* OS API definitions */
> #define GETHOSTBYNAME_R_HOSTENT_DATA
> @gethostbyname_r_hostent_data@
>
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c95298df/proxy/InkAPI.cc
> ----------------------------------------------------------------------
> diff --git a/proxy/InkAPI.cc b/proxy/InkAPI.cc
> index 661a2d0..fd8258e 100644
> --- a/proxy/InkAPI.cc
> +++ b/proxy/InkAPI.cc
> @@ -7324,10 +7324,14 @@ TSAIOWrite(int fd, off_t offset, char* buf,
> const size_t bufSize, TSCont contp)
> TSReturnCode
> TSAIOThreadNumSet(int thread_num)
> {
> +#if AIO_MODE == AIO_MODE_NATIVE
> + return TS_SUCCESS;
> +#else
> if (ink_aio_thread_num_set(thread_num))
> return TS_SUCCESS;
>
> return TS_ERROR;
> +#endif
> }
>
> void
>
>
--
Igor Galić
Tel: +43 (0) 664 886 22 883
Mail: i.galic@brainsware.org
URL: http://brainsware.org/
GPG: 6880 4155 74BD FD7C B515 2EA5 4B1D 9E08 A097 C9AE
Re: git commit: TS-1760: Option to use Linux native AIO
Posted by Igor Galić <i....@brainsware.org>.
----- Original Message -----
> Updated Branches:
> refs/heads/master 84df57e0e -> c95298dfc
>
>
> TS-1760: Option to use Linux native AIO
>
> to enable the Linux Native AIO, be sure to check Linux kernel AIO
> supporting and use '--use_linux_native_aio' configure directive.
as far as ./configure options go, --enable-linux-native-aio or --with-linux-native-aio
is probably a better choice
> in the Linux Native AIO, all the IO is managed by system, so
> proxy.config.cache.threads_per_disk have no meaning anymore.
>
>
> Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo
> Commit:
> http://git-wip-us.apache.org/repos/asf/trafficserver/commit/c95298df
> Tree:
> http://git-wip-us.apache.org/repos/asf/trafficserver/tree/c95298df
> Diff:
> http://git-wip-us.apache.org/repos/asf/trafficserver/diff/c95298df
>
> Branch: refs/heads/master
> Commit: c95298dfcc6336cc04f172713f5d41e023d19d51
> Parents: 84df57e
> Author: weijin <ta...@taobao.com>
> Authored: Wed Apr 3 14:59:48 2013 +0800
> Committer: Zhao Yongming <mi...@gmail.com>
> Committed: Wed Apr 3 15:34:53 2013 +0800
>
> ----------------------------------------------------------------------
> configure.ac | 19 +++++
> iocore/aio/AIO.cc | 146
> +++++++++++++++++++++++++++++++++++++++--
> iocore/aio/I_AIO.h | 129
> ++++++++++++++++++++++++++++++++++++-
> iocore/aio/P_AIO.h | 59 +++++++++++++++--
> iocore/cache/Cache.cc | 74 ++++++++++++++++++++-
> iocore/cache/CacheVol.cc | 2 +-
> lib/ts/ink_aiocb.h | 4 +-
> lib/ts/ink_config.h.in | 1 +
> proxy/InkAPI.cc | 4 +
> 9 files changed, 421 insertions(+), 17 deletions(-)
> ----------------------------------------------------------------------
>
>
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c95298df/configure.ac
> ----------------------------------------------------------------------
> diff --git a/configure.ac b/configure.ac
> index 7b30f26..72e60b0 100644
> --- a/configure.ac
> +++ b/configure.ac
> @@ -398,6 +398,25 @@ TS_ARG_ENABLE_VAR([use], [reclaimable_freelist])
> AC_SUBST(use_reclaimable_freelist)
>
> #
> +# If the OS is linux, we can use '--use_linux_native_aio' option to
> +# replace the aio thread mode. Effective only on the linux system.
> +#
> +
> +if test "x${host_os_def}" = "xlinux"; then
> + AC_MSG_CHECKING([whether to use native aio or not])
> + AC_ARG_ENABLE([linux_native_aio],
> + [AS_HELP_STRING([--enable-linux-native-aio],
> + [turn on linux native aio, only effective on linux system])],
> + [],
> + [enable_linux_native_aio="yes"])
> + AC_MSG_RESULT([$enable_linux_native_aio])
> +else
> + enable_linux_native_aio="no"
> +fi
> +TS_ARG_ENABLE_VAR([use], [linux_native_aio])
> +AC_SUBST(use_linux_native_aio)
> +
> +
> # Configure how many stats to allocate for plugins. Default is 512.
> #
> AC_ARG_WITH([max-api-stats],
>
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c95298df/iocore/aio/AIO.cc
> ----------------------------------------------------------------------
> diff --git a/iocore/aio/AIO.cc b/iocore/aio/AIO.cc
> index 3086f0c..40bf1b1 100644
> --- a/iocore/aio/AIO.cc
> +++ b/iocore/aio/AIO.cc
> @@ -27,6 +27,10 @@
>
> #include "P_AIO.h"
>
> +#if AIO_MODE == AIO_MODE_NATIVE
> +#define AIO_PERIOD
> -HRTIME_MSECONDS(4)
> +#else
> +
> #define MAX_DISKS_POSSIBLE 100
>
> // globals
> @@ -36,24 +40,24 @@ int ts_config_with_inkdiskio = 0;
> AIO_Reqs *aio_reqs[MAX_DISKS_POSSIBLE];
> /* number of unique file descriptors in the aio_reqs array */
> volatile int num_filedes = 1;
> -RecRawStatBlock *aio_rsb = NULL;
> +
> // acquire this mutex before inserting a new entry in the aio_reqs
> array.
> // Don't need to acquire this for searching the array
> static ink_mutex insert_mutex;
> -Continuation *aio_err_callbck = 0;
> +
> RecInt cache_config_threads_per_disk = 12;
> RecInt api_config_threads_per_disk = 12;
> int thread_is_created = 0;
> +#endif // AIO_MODE == AIO_MODE_NATIVE
>
> -
> +RecRawStatBlock *aio_rsb = NULL;
> +Continuation *aio_err_callbck = 0;
> // AIO Stats
> uint64_t aio_num_read = 0;
> uint64_t aio_bytes_read = 0;
> uint64_t aio_num_write = 0;
> uint64_t aio_bytes_written = 0;
>
> -static void aio_move(AIO_Reqs *req);
> -
> /*
> * Stats
> */
> @@ -156,10 +160,12 @@ ink_aio_init(ModuleVersion v)
> RecRegisterRawStat(aio_rsb, RECT_PROCESS,
> "proxy.process.cache.KB_write_per_sec",
> RECD_FLOAT, RECP_NULL, (int)
> AIO_STAT_KB_WRITE_PER_SEC, aio_stats_cb);
> +#if AIO_MODE != AIO_MODE_NATIVE
> memset(&aio_reqs, 0, MAX_DISKS_POSSIBLE * sizeof(AIO_Reqs *));
> ink_mutex_init(&insert_mutex, NULL);
>
> REC_ReadConfigInteger(cache_config_threads_per_disk,
> "proxy.config.cache.threads_per_disk");
> +#endif
> }
>
> int
> @@ -172,6 +178,7 @@ ink_aio_start()
> return 0;
> }
>
> +#if AIO_MODE != AIO_MODE_NATIVE
>
> static void *aio_thread_main(void *arg);
>
> @@ -534,3 +541,132 @@ aio_thread_main(void *arg)
> }
> return 0;
> }
> +#else
> +int
> +DiskHandler::startAIOEvent(int event, Event *e) {
> + SET_HANDLER(&DiskHandler::mainAIOEvent);
> + e->schedule_every(AIO_PERIOD);
> + trigger_event = e;
> + return EVENT_CONT;
> +}
> +
> +int
> +DiskHandler::mainAIOEvent(int event, Event *e) {
> + AIOCallback *op = NULL;
> +Lagain:
> + int ret = io_getevents(ctx, 0, MAX_AIO_EVENTS, events, NULL);
> + //printf("%d\n", ret);
> + for (int i = 0; i < ret; i++) {
> + op = (AIOCallback *) events[i].data;
> + op->aio_result = events[i].res;
> + ink_assert(op->action.continuation);
> + complete_list.enqueue(op);
> + //op->handleEvent(event, e);
> + }
> + if (ret == MAX_AIO_EVENTS)
> + goto Lagain;
> + if (ret < 0)
> + perror("io_getevents");
> +
> + ink_aiocb_t *cbs[MAX_AIO_EVENTS];
> + int num = 0;
> + for (; num < MAX_AIO_EVENTS && ((op = ready_list.dequeue()) !=
> NULL); ++num) {
> + cbs[num] = &op->aiocb;
> + ink_debug_assert(op->action.continuation);
> + }
> + if (num > 0) {
> + int ret;
> + do {
> + ret = io_submit(ctx, num, cbs);
> + } while (ret < 0 && errno == EAGAIN);
> +
> + if (ret != num) {
> + if (ret < 0)
> + perror("io_submit error");
> + else {
> + fprintf(stderr, "could not sumbit IOs");
> + ink_debug_assert(0);
> + }
> + }
> + }
> +
> + while ((op = complete_list.dequeue()) != NULL) {
> + op->handleEvent(event, e);
> + }
> + return EVENT_CONT;
> +}
> +
> +int
> +ink_aio_read(AIOCallback *op, int fromAPI) {
> + op->aiocb.aio_reqprio = AIO_DEFAULT_PRIORITY;
> + op->aiocb.aio_lio_opcode = IOCB_CMD_PREAD;
> + op->aiocb.aio_data = op;
> + this_ethread()->diskHandler->ready_list.enqueue(op);
> +
> + return 1;
> +}
> +
> +int
> +ink_aio_write(AIOCallback *op, int fromAPI) {
> + op->aiocb.aio_reqprio = AIO_DEFAULT_PRIORITY;
> + op->aiocb.aio_lio_opcode = IOCB_CMD_PWRITE;
> + op->aiocb.aio_data = op;
> + this_ethread()->diskHandler->ready_list.enqueue(op);
> +
> + return 1;
> +}
> +
> +int
> +ink_aio_readv(AIOCallback *op, int fromAPI) {
> + DiskHandler *dh = this_ethread()->diskHandler;
> + AIOCallback *io = op;
> + int sz = 0;
> +
> + while (io) {
> + io->aiocb.aio_reqprio = AIO_DEFAULT_PRIORITY;
> + io->aiocb.aio_lio_opcode = IOCB_CMD_PREAD;
> + io->aiocb.aio_data = io;
> + dh->ready_list.enqueue(io);
> + ++sz;
> + io = io->then;
> + }
> +
> + if (sz > 1) {
> + ink_debug_assert(op->action.continuation);
> + AIOVec *vec = new AIOVec(sz, op->action.continuation);
> + vec->action = op->action.continuation;
> + while (--sz >= 0) {
> + op->action = vec;
> + op = op->then;
> + }
> + }
> + return 1;
> +}
> +
> +int
> +ink_aio_writev(AIOCallback *op, int fromAPI) {
> + DiskHandler *dh = this_ethread()->diskHandler;
> + AIOCallback *io = op;
> + int sz = 0;
> +
> + while (io) {
> + io->aiocb.aio_reqprio = AIO_DEFAULT_PRIORITY;
> + io->aiocb.aio_lio_opcode = IOCB_CMD_PWRITE;
> + io->aiocb.aio_data = io;
> + dh->ready_list.enqueue(io);
> + ++sz;
> + io = io->then;
> + }
> +
> + if (sz > 1) {
> + ink_debug_assert(op->action.continuation);
> + AIOVec *vec = new AIOVec(sz, op->action.continuation);
> + vec->action = op->action.continuation;
> + while (--sz >= 0) {
> + op->action = vec;
> + op = op->then;
> + }
> + }
> + return 1;
> +}
> +#endif // AIO_MODE != AIO_MODE_NATIVE
>
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c95298df/iocore/aio/I_AIO.h
> ----------------------------------------------------------------------
> diff --git a/iocore/aio/I_AIO.h b/iocore/aio/I_AIO.h
> index 8d4cd2b..ccdc078 100644
> --- a/iocore/aio/I_AIO.h
> +++ b/iocore/aio/I_AIO.h
> @@ -46,8 +46,114 @@
> #define AIO_MODE_AIO 0
> #define AIO_MODE_SYNC 1
> #define AIO_MODE_THREAD 2
> +#define AIO_MODE_NATIVE 3
> +#if use_linux_native_aio
> +#define AIO_MODE AIO_MODE_NATIVE
> +#else
> #define AIO_MODE AIO_MODE_THREAD
> +#endif
> +
> +#if AIO_MODE == AIO_MODE_NATIVE
> +
> +#include <sys/syscall.h> /* for __NR_* definitions */
> +#include <linux/aio_abi.h> /* for AIO types and constants */
> +#define MAX_AIO_EVENTS 1024
> +
> +#if defined(__LITTLE_ENDIAN)
> +#if (SIZEOF_VOID_POINTER == 4)
> +#define PADDEDPtr(x, y) x; unsigned y
> +#define PADDEDul(x, y) unsigned long x; unsigned y
> +#elif (SIZEOF_VOID_POINTER == 8)
> +#define PADDEDPtr(x, y) x
> +#define PADDEDul(x, y) unsigned long x
> +#endif
> +#elif defined(__BIG_ENDIAN)
> +#if (SIZEOF_VOID_POINTER == 4)
> +#define PADDEDPtr(x, y) unsigned y; x
> +#define PADDEDul(x, y) unsigned y; unsigned long y
> +#elif (SIZEOF_VOID_POINTER == 8)
> +#define PADDEDPtr(x, y) x
> +#define PADDEDul(x, y) unsigned long x
> +#endif
> +#else
> +#error edit for your odd byteorder.
> +#endif
> +
> +typedef struct ink_iocb {
> + /* these are internal to the kernel/libc. */
> + PADDEDPtr(void *aio_data, _pad1); /* data to be returned in
> event's data */
> + unsigned PADDED(aio_key, aio_reserved1);
> + /* the kernel sets aio_key to the req # */
> +
> + /* common fields */
> + short aio_lio_opcode; /* see IOCB_CMD_ above */
> + short aio_reqprio;
> + int aio_fildes;
> +
> + PADDEDPtr(void *aio_buf, _pad2);
> + PADDEDul(aio_nbytes, _pad3);
> + int64_t aio_offset;
> +
> + /* extra parameters */
> + uint64_t aio_reserved2; /* TODO: use this for a (struct sigevent
> *) */
> +
> + /* flags for the "struct iocb" */
> + int aio_flags;
> +
> + /*
> + * if the IOCB_FLAG_RESFD flag of "aio_flags" is set, this is an
> + * eventfd to signal AIO readiness to
> + */
> + int aio_resfd;
> +
> +} ink_aiocb_t;
> +
> +typedef struct ink_io_event {
> + PADDEDPtr(void *data, _pad1); /* the data field from the iocb */
> + PADDEDPtr(ink_aiocb_t *obj, _pad2); /* what iocb this event
> came from */
> + PADDEDul(res, _pad3); /* result code for this event */
> + PADDEDul(res2, _pad4); /* secondary result */
> +} ink_io_event_t;
> +
> +TS_INLINE int io_setup(unsigned nr, aio_context_t *ctxp)
> +{
> + return syscall(__NR_io_setup, nr, ctxp);
> +}
> +
> +TS_INLINE int io_destroy(aio_context_t ctx)
> +{
> + return syscall(__NR_io_destroy, ctx);
> +}
> +
> +TS_INLINE int io_submit(aio_context_t ctx, long nr, ink_aiocb_t
> **iocbpp)
> +{
> + return syscall(__NR_io_submit, ctx, nr, iocbpp);
> +}
> +
> +TS_INLINE int io_getevents(aio_context_t ctx, long min_nr, long
> max_nr,
> + ink_io_event_t *events, struct timespec *timeout)
> +{
> + return syscall(__NR_io_getevents, ctx, min_nr, max_nr, events,
> timeout);
> +}
> +
> +struct AIOVec: public Continuation
> +{
> + Action action;
> + int size;
> + int completed;
> +
> + AIOVec(int sz, Continuation *c): Continuation(new_ProxyMutex()),
> size(sz), completed(0)
> + {
> + action = c;
> + SET_HANDLER(&AIOVec::mainEvent);
> + }
>
> + int mainEvent(int event, Event *e);
> +};
> +#else
> +typedef ink_aiocb ink_aiocb_t;
> +bool ink_aio_thread_num_set(int thread_num);
> +#endif
> // AIOCallback::thread special values
> #define AIO_CALLBACK_THREAD_ANY ((EThread*)0) // any regular event
> thread
> #define AIO_CALLBACK_THREAD_AIO ((EThread*)-1)
> @@ -71,12 +177,33 @@ struct AIOCallback: public Continuation
> }
> };
>
> +#if AIO_MODE == AIO_MODE_NATIVE
> +struct DiskHandler: public Continuation
> +{
> + Event *trigger_event;
> + aio_context_t ctx;
> + ink_io_event_t events[MAX_AIO_EVENTS];
> + Que(AIOCallback, link) ready_list;
> + Que(AIOCallback, link) complete_list;
> + int startAIOEvent(int event, Event *e);
> + int mainAIOEvent(int event, Event *e);
> + DiskHandler() {
> + SET_HANDLER(&DiskHandler::startAIOEvent);
> + memset(&ctx, 0, sizeof(aio_context_t));
> + int ret = io_setup(MAX_AIO_EVENTS, &ctx);
> + if (ret < 0) {
> + perror("io_setup error");
> + }
> + }
> +};
> +#endif
> void ink_aio_init(ModuleVersion version);
> int ink_aio_start();
> void ink_aio_set_callback(Continuation * error_callback);
>
> int ink_aio_read(AIOCallback *op, int fromAPI = 0); // fromAPI is
> a boolean to indicate if this is from a API call such as upload
> proxy feature
> int ink_aio_write(AIOCallback *op, int fromAPI = 0);
> -bool ink_aio_thread_num_set(int thread_num);
> +int ink_aio_readv(AIOCallback *op, int fromAPI = 0); // fromAPI is
> a boolean to indicate if this is from a API call such as upload
> proxy feature
> +int ink_aio_writev(AIOCallback *op, int fromAPI = 0);
> AIOCallback *new_AIOCallback(void);
> #endif
>
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c95298df/iocore/aio/P_AIO.h
> ----------------------------------------------------------------------
> diff --git a/iocore/aio/P_AIO.h b/iocore/aio/P_AIO.h
> index 2d686e3..9e3321c 100644
> --- a/iocore/aio/P_AIO.h
> +++ b/iocore/aio/P_AIO.h
> @@ -41,6 +41,58 @@
> #define AIO_MODULE_VERSION
> makeModuleVersion(AIO_MODULE_MAJOR_VERSION,\
> AIO_MODULE_MINOR_VERSION,\
> PRIVATE_MODULE_HEADER)
> +
> +TS_INLINE int
> +AIOCallback::ok()
> +{
> + return (off_t) aiocb.aio_nbytes == (off_t) aio_result;
> +}
> +
> +#if AIO_MODE == AIO_MODE_NATIVE
> +
> +extern Continuation *aio_err_callbck;
> +
> +struct AIOCallbackInternal: public AIOCallback
> +{
> + int io_complete(int event, void *data);
> + AIOCallbackInternal()
> + {
> + memset ((char *) &(this->aiocb), 0, sizeof(this->aiocb));
> + SET_HANDLER(&AIOCallbackInternal::io_complete);
> + }
> +};
> +
> +TS_INLINE int
> +AIOCallbackInternal::io_complete(int event, void *data)
> +{
> + (void) event;
> + (void) data;
> +
> + if (!ok() && aio_err_callbck)
> + eventProcessor.schedule_imm(aio_err_callbck, ET_CALL,
> AIO_EVENT_DONE);
> + mutex = action.mutex;
> + MUTEX_LOCK(lock, mutex, this_ethread());
> + if (!action.cancelled)
> + action.continuation->handleEvent(AIO_EVENT_DONE, this);
> + return EVENT_DONE;
> +}
> +
> +TS_INLINE int
> +AIOVec::mainEvent(int event, Event *e) {
> + ++completed;
> + if (completed < size)
> + return EVENT_CONT;
> + else if (completed == size) {
> + MUTEX_LOCK(lock, action.mutex, this_ethread());
> + if (!action.cancelled)
> + action.continuation->handleEvent(AIO_EVENT_DONE, this);
> + delete this;
> + return EVENT_DONE;
> + }
> + ink_assert(!"AIOVec mainEvent err");
> + return EVENT_ERROR;
> +}
> +#else
> struct AIO_Reqs;
>
> struct AIOCallbackInternal: public AIOCallback
> @@ -61,12 +113,6 @@ struct AIOCallbackInternal: public AIOCallback
> };
>
> TS_INLINE int
> -AIOCallback::ok()
> -{
> - return (off_t) aiocb.aio_nbytes == (off_t) aio_result;
> -}
> -
> -TS_INLINE int
> AIOCallbackInternal::io_complete(int event, void *data)
> {
> (void) event;
> @@ -92,6 +138,7 @@ struct AIO_Reqs
> volatile int requests_queued;
> };
>
> +#endif // AIO_MODE == AIO_MODE_NATIVE
> #ifdef AIO_STATS
> class AIOTestData:public Continuation
> {
>
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c95298df/iocore/cache/Cache.cc
> ----------------------------------------------------------------------
> diff --git a/iocore/cache/Cache.cc b/iocore/cache/Cache.cc
> index 1232a11..d5ba29f 100644
> --- a/iocore/cache/Cache.cc
> +++ b/iocore/cache/Cache.cc
> @@ -146,6 +146,51 @@ struct VolInitInfo
> }
> };
>
> +#if AIO_MODE == AIO_MODE_NATIVE
> +struct VolInit : public Continuation
> +{
> + Vol *vol;
> + char *path;
> + off_t blocks;
> + int64_t offset;
> + bool vol_clear;
> +
> + int mainEvent(int event, Event *e) {
> + vol->init(path, blocks, offset, vol_clear);
> + mutex.clear();
> + delete this;
> + return EVENT_DONE;
> + }
> +
> + VolInit(Vol *v, char *p, off_t b, int64_t o, bool c) :
> Continuation(v->mutex),
> + vol(v), path(p), blocks(b), offset(o), vol_clear(c) {
> + SET_HANDLER(&VolInit::mainEvent);
> + }
> +};
> +
> +struct DiskInit : public Continuation
> +{
> + CacheDisk *disk;
> + char *s;
> + off_t blocks;
> + off_t askip;
> + int ahw_sector_size;
> + int fildes;
> + bool clear;
> +
> + int mainEvent(int event, Event *e) {
> + disk->open(s, blocks, askip, ahw_sector_size, fildes, clear);
> + mutex.clear();
> + delete this;
> + return EVENT_DONE;
> + }
> +
> + DiskInit(CacheDisk *d, char *str, off_t b, off_t skip, int sector,
> int f, bool c) : Continuation(d->mutex),
> + disk(d), s(str), blocks(b), askip(skip),
> ahw_sector_size(sector), fildes(f), clear(c) {
> + SET_HANDLER(&DiskInit::mainEvent);
> + }
> +};
> +#endif
> void cplist_init();
> static void cplist_update();
> int cplist_reconfigure();
> @@ -530,6 +575,16 @@ CacheProcessor::start_internal(int flags)
> verify_cache_api();
> #endif
>
> +#if AIO_MODE == AIO_MODE_NATIVE
> + int etype = ET_NET;
> + int n_netthreads = eventProcessor.n_threads_for_type[etype];
> + EThread **netthreads = eventProcessor.eventthread[etype];
> + for (int i = 0; i < n_netthreads; ++i) {
> + netthreads[i]->diskHandler = new DiskHandler();
> + netthreads[i]->schedule_imm(netthreads[i]->diskHandler);
> + }
> +#endif
> +
> start_internal_flags = flags;
> clear = !!(flags & PROCESSOR_RECONFIGURE) || auto_clear_flag;
> fix = !!(flags & PROCESSOR_FIX);
> @@ -593,7 +648,11 @@ CacheProcessor::start_internal(int flags)
> }
> off_t skip = ROUND_TO_STORE_BLOCK((sd->offset < START_POS ?
> START_POS + sd->alignment : sd->offset));
> blocks = blocks - ROUND_TO_STORE_BLOCK(sd->offset + skip);
> +#if AIO_MODE == AIO_MODE_NATIVE
> + eventProcessor.schedule_imm(NEW(new
> DiskInit(gdisks[gndisks], path, blocks, skip, sector_size, fd,
> clear)));
> +#else
> gdisks[gndisks]->open(path, blocks, skip, sector_size, fd,
> clear);
> +#endif
> gndisks++;
> }
> } else {
> @@ -1109,8 +1168,11 @@ Vol::init(char *s, off_t blocks, off_t
> dir_skip, bool clear)
> aio->thread = AIO_CALLBACK_THREAD_ANY;
> aio->then = (i < 3) ? &(init_info->vol_aio[i + 1]) : 0;
> }
> -
> +#if AIO_MODE == AIO_MODE_NATIVE
> + ink_assert(ink_aio_readv(init_info->vol_aio));
> +#else
> ink_assert(ink_aio_read(init_info->vol_aio));
> +#endif
> return 0;
> }
>
> @@ -1440,7 +1502,11 @@ Ldone:{
> init_info->vol_aio[2].aiocb.aio_offset = ss + dirlen -
> footerlen;
>
> SET_HANDLER(&Vol::handle_recover_write_dir);
> +#if AIO_MODE == AIO_MODE_NATIVE
> + ink_assert(ink_aio_writev(init_info->vol_aio));
> +#else
> ink_assert(ink_aio_write(init_info->vol_aio));
> +#endif
> return EVENT_CONT;
> }
>
> @@ -1812,7 +1878,11 @@ Cache::open(bool clear, bool fix) {
> blocks = q->b->len;
>
> bool vol_clear = clear || d->cleared || q->new_block;
> +#if AIO_MODE == AIO_MODE_NATIVE
> + eventProcessor.schedule_imm(NEW(new
> VolInit(cp->vols[vol_no], d->path, blocks, q->b->offset,
> vol_clear)));
> +#else
> cp->vols[vol_no]->init(d->path, blocks, q->b->offset,
> vol_clear);
> +#endif
> vol_no++;
> cache_size += blocks;
> }
> @@ -1926,7 +1996,7 @@ CacheVC::handleReadDone(int event, Event *e) {
> if (checksum != doc->checksum) {
> Note("cache: checksum error for [%" PRIu64 " %" PRIu64 "]
> len %d, hlen %d, disk %s, offset %" PRIu64 " size %zu",
> doc->first_key.b[0], doc->first_key.b[1],
> - doc->len, doc->hlen, vol->path, io.aiocb.aio_offset,
> io.aiocb.aio_nbytes);
> + doc->len, doc->hlen, vol->path, io.aiocb.aio_offset,
> (size_t)io.aiocb.aio_nbytes);
> doc->magic = DOC_CORRUPT;
> okay = 0;
> }
>
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c95298df/iocore/cache/CacheVol.cc
> ----------------------------------------------------------------------
> diff --git a/iocore/cache/CacheVol.cc b/iocore/cache/CacheVol.cc
> index bf4fad9..ccd2009 100644
> --- a/iocore/cache/CacheVol.cc
> +++ b/iocore/cache/CacheVol.cc
> @@ -365,7 +365,7 @@ Lread:
> offset = 0;
> ink_assert(ink_aio_read(&io) >= 0);
> Debug("cache_scan_truss", "read %p:scanObject %" PRId64 " %zu",
> this,
> - (int64_t)io.aiocb.aio_offset, io.aiocb.aio_nbytes);
> + (int64_t)io.aiocb.aio_offset, (size_t)io.aiocb.aio_nbytes);
> return EVENT_CONT;
>
> Ldone:
>
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c95298df/lib/ts/ink_aiocb.h
> ----------------------------------------------------------------------
> diff --git a/lib/ts/ink_aiocb.h b/lib/ts/ink_aiocb.h
> index 1cf9d38..543243b 100644
> --- a/lib/ts/ink_aiocb.h
> +++ b/lib/ts/ink_aiocb.h
> @@ -39,7 +39,7 @@
> #define LIO_READ 0x1
> #define LIO_WRITE 0x2
>
> -typedef struct ink_aiocb
> +struct ink_aiocb
> {
> int aio_fildes;
> #if defined(__STDC__)
> @@ -58,6 +58,6 @@ typedef struct ink_aiocb
> // aio_result_t aio_resultp; /* results */
> int aio_state; /* state flag for List I/O */
> int aio__pad[1]; /* extension padding */
> -} ink_aiocb_t;
> +};
>
> #endif
>
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c95298df/lib/ts/ink_config.h.in
> ----------------------------------------------------------------------
> diff --git a/lib/ts/ink_config.h.in b/lib/ts/ink_config.h.in
> index 6b0525c..93e393c 100644
> --- a/lib/ts/ink_config.h.in
> +++ b/lib/ts/ink_config.h.in
> @@ -122,6 +122,7 @@
> #define TS_USE_RECLAIMABLE_FREELIST @use_reclaimable_freelist@
> #define TS_USE_TLS_NPN @use_tls_npn@
> #define TS_USE_TLS_SNI @use_tls_sni@
> +#define TS_USE_LINUX_NATIVE_AIO @use_linux_native_aio@
>
> /* OS API definitions */
> #define GETHOSTBYNAME_R_HOSTENT_DATA
> @gethostbyname_r_hostent_data@
>
> http://git-wip-us.apache.org/repos/asf/trafficserver/blob/c95298df/proxy/InkAPI.cc
> ----------------------------------------------------------------------
> diff --git a/proxy/InkAPI.cc b/proxy/InkAPI.cc
> index 661a2d0..fd8258e 100644
> --- a/proxy/InkAPI.cc
> +++ b/proxy/InkAPI.cc
> @@ -7324,10 +7324,14 @@ TSAIOWrite(int fd, off_t offset, char* buf,
> const size_t bufSize, TSCont contp)
> TSReturnCode
> TSAIOThreadNumSet(int thread_num)
> {
> +#if AIO_MODE == AIO_MODE_NATIVE
> + return TS_SUCCESS;
> +#else
> if (ink_aio_thread_num_set(thread_num))
> return TS_SUCCESS;
>
> return TS_ERROR;
> +#endif
> }
>
> void
>
>
--
Igor Galić
Tel: +43 (0) 664 886 22 883
Mail: i.galic@brainsware.org
URL: http://brainsware.org/
GPG: 6880 4155 74BD FD7C B515 2EA5 4B1D 9E08 A097 C9AE