You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by bc...@apache.org on 2010/06/10 01:24:15 UTC
svn commit: r953188 - in /trafficserver/traffic/trunk: iocore/aio/AIO.cc
iocore/aio/I_AIO.h proxy/InkAPI.cc proxy/api/ts/ts.h
Author: bcall
Date: Wed Jun 9 23:24:15 2010
New Revision: 953188
URL: http://svn.apache.org/viewvc?rev=953188&view=rev
Log:
TS-387 Adds APIs for aio disk read and writes using the internal aio
support in iocore.
Author: Wendy Huang
Review and minor updates: Bryan Call
Modified:
trafficserver/traffic/trunk/iocore/aio/AIO.cc
trafficserver/traffic/trunk/iocore/aio/I_AIO.h
trafficserver/traffic/trunk/proxy/InkAPI.cc
trafficserver/traffic/trunk/proxy/api/ts/ts.h
Modified: trafficserver/traffic/trunk/iocore/aio/AIO.cc
URL: http://svn.apache.org/viewvc/trafficserver/traffic/trunk/iocore/aio/AIO.cc?rev=953188&r1=953187&r2=953188&view=diff
==============================================================================
--- trafficserver/traffic/trunk/iocore/aio/AIO.cc (original)
+++ trafficserver/traffic/trunk/iocore/aio/AIO.cc Wed Jun 9 23:24:15 2010
@@ -35,13 +35,16 @@ int ts_config_with_inkdiskio = 0;
/* structure to hold information about each file descriptor */
AIO_Reqs *aio_reqs[MAX_DISKS_POSSIBLE];
/* number of unique file descriptors in the aio_reqs array */
-volatile int num_filedes = 0;
+volatile int num_filedes = 1;
RecRawStatBlock *aio_rsb = NULL;
// acquire this mutex before inserting a new entry in the aio_reqs array.
// Don't need to acquire this for searching the array
static ink_mutex insert_mutex;
Continuation *aio_err_callbck = 0;
RecInt cache_config_threads_per_disk = 12;
+RecInt api_config_threads_per_disk = 12;
+int thread_is_created = 0;
+
// AIO Stats
uint64 aio_num_read = 0;
@@ -111,7 +114,8 @@ AIOTestData::ink_aio_stats(int event, vo
{
ink_hrtime now = ink_get_hrtime();
double time_msec = (double) (now - start) / (double) HRTIME_MSECOND;
- for (int i = 0; i < num_filedes; i++)
+ int i = (aio_reqs[0] == NULL)? 1 : 0;
+ for (; i < num_filedes; i++) {
printf("%0.2f\t%i\t%i\t%i\n", time_msec, aio_reqs[i]->filedes, aio_reqs[i]->pending, aio_reqs[i]->queued);
printf("Num Requests: %i Num Queued: %i num Moved: %i\n\n", data->num_req, data->num_queue, data->num_temp);
eventProcessor.schedule_in(this, HRTIME_MSECONDS(50), ET_CALL);
@@ -205,7 +209,7 @@ struct AIOThreadInfo:public Continuation
/* insert an entry for file descriptor fildes into aio_reqs */
static AIO_Reqs *
-aio_init_fildes(int fildes)
+aio_init_fildes(int fildes, int fromAPI = 0)
{
int i;
AIO_Reqs *request = (AIO_Reqs *) malloc(sizeof(AIO_Reqs));
@@ -217,15 +221,25 @@ aio_init_fildes(int fildes)
ink_mutex_init(&request->aio_mutex, NULL);
ink_atomiclist_init(&request->aio_temp_list, "temp_list", (uintptr_t) &((AIOCallback *) 0)->link);
- request->index = num_filedes;
- request->filedes = fildes;
+ RecInt thread_num;
- aio_reqs[num_filedes] = request;
+ if (fromAPI) {
+ request->index = 0;
+ request->filedes = -1;
+ aio_reqs[0] = request;
+ thread_is_created = 1;
+ thread_num = api_config_threads_per_disk;
+ } else {
+ request->index = num_filedes;
+ request->filedes = fildes;
+ aio_reqs[num_filedes] = request;
+ thread_num = cache_config_threads_per_disk;
+ }
/* create the main thread */
AIOThreadInfo *thr_info;
- for (i = 0; i < cache_config_threads_per_disk; i++) {
- if (i == (cache_config_threads_per_disk - 1))
+ for (i = 0; i < thread_num; i++) {
+ if (i == (thread_num - 1))
thr_info = new AIOThreadInfo(request, 1);
else
thr_info = new AIOThreadInfo(request, 0);
@@ -234,7 +248,9 @@ aio_init_fildes(int fildes)
/* the num_filedes should be incremented after initializing everything.
This prevents a thread from looking at uninitialized fields */
- num_filedes++;
+ if (!fromAPI) {
+ num_filedes++;
+ }
return request;
}
@@ -296,16 +312,16 @@ aio_move(AIO_Reqs *req)
/* queue the new request */
static void
-aio_queue_req(AIOCallbackInternal *op)
+aio_queue_req(AIOCallbackInternal *op, int fromAPI = 0)
{
- int thread_ndx = 0;
+ int thread_ndx = 1;
AIO_Reqs *req = op->aio_req;
op->link.next = NULL;;
op->link.prev = NULL;
#ifdef AIO_STATS
ink_atomic_increment((int *) &data->num_req, 1);
#endif
- if (!req || req->filedes != op->aiocb.aio_fildes) {
+ if (!fromAPI && (!req || req->filedes != op->aiocb.aio_fildes)) {
/* search for the matching file descriptor */
for (; thread_ndx < num_filedes; thread_ndx++) {
if (aio_reqs[thread_ndx]->filedes == op->aiocb.aio_fildes) {
@@ -324,7 +340,7 @@ aio_queue_req(AIOCallbackInternal *op)
aio_reqs and acquired the mutex. check the aio_reqs array to
make sure the entry inserted does not correspond to the current
file descriptor */
- for (thread_ndx = 0; thread_ndx < num_filedes; thread_ndx++) {
+ for (thread_ndx = 1; thread_ndx < num_filedes; thread_ndx++) {
if (aio_reqs[thread_ndx]->filedes == op->aiocb.aio_fildes) {
req = aio_reqs[thread_ndx];
break;
@@ -337,6 +353,16 @@ aio_queue_req(AIOCallbackInternal *op)
}
op->aio_req = req;
}
+ if (fromAPI && (!req || req->filedes != -1)) {
+ ink_mutex_acquire(&insert_mutex);
+ if (aio_reqs[0] == NULL) {
+ req = aio_init_fildes(-1, 1);
+ } else {
+ req = aio_reqs[0];
+ }
+ ink_mutex_release(&insert_mutex);
+ op->aio_req = req;
+ }
ink_atomic_increment(&req->requests_queued, 1);
if (!ink_mutex_try_acquire(&req->aio_mutex)) {
#ifdef AIO_STATS
@@ -387,7 +413,7 @@ cache_op(AIOCallbackInternal *op)
}
int
-ink_aio_read(AIOCallback *op)
+ink_aio_read(AIOCallback *op, int fromAPI)
{
op->aiocb.aio_lio_opcode = LIO_READ;
switch (AIO_MODE) {
@@ -407,14 +433,14 @@ ink_aio_read(AIOCallback *op)
op->action.continuation->handleEvent(AIO_EVENT_DONE, op);
break;
case AIO_MODE_THREAD:
- aio_queue_req((AIOCallbackInternal *) op);
+ aio_queue_req((AIOCallbackInternal *) op, fromAPI);
break;
}
return 1;
}
int
-ink_aio_write(AIOCallback *op)
+ink_aio_write(AIOCallback *op, int fromAPI)
{
op->aiocb.aio_lio_opcode = LIO_WRITE;
switch (AIO_MODE) {
@@ -434,12 +460,23 @@ ink_aio_write(AIOCallback *op)
op->action.continuation->handleEvent(AIO_EVENT_DONE, op);
break;
case AIO_MODE_THREAD:
- aio_queue_req((AIOCallbackInternal *) op);
+ aio_queue_req((AIOCallbackInternal *) op, fromAPI);
break;
}
return 1;
}
+int
+ink_aio_thread_num_set(int thread_num)
+{
+ if (thread_num > 0 && !thread_is_created) {
+ api_config_threads_per_disk = thread_num;
+ return 1;
+ } else {
+ return 0;
+ }
+}
+
void *
aio_thread_main(void *arg)
{
Modified: trafficserver/traffic/trunk/iocore/aio/I_AIO.h
URL: http://svn.apache.org/viewvc/trafficserver/traffic/trunk/iocore/aio/I_AIO.h?rev=953188&r1=953187&r2=953188&view=diff
==============================================================================
--- trafficserver/traffic/trunk/iocore/aio/I_AIO.h (original)
+++ trafficserver/traffic/trunk/iocore/aio/I_AIO.h Wed Jun 9 23:24:15 2010
@@ -79,7 +79,8 @@ void ink_aio_init(ModuleVersion version)
int ink_aio_start();
void ink_aio_set_callback(Continuation * error_callback);
-int ink_aio_read(AIOCallback * op);
-int ink_aio_write(AIOCallback * op);
+int ink_aio_read(AIOCallback *op, int fromAPI = 0); // fromAPI is a boolean to indicate if this is from a API call such as upload proxy feature
+int ink_aio_write(AIOCallback *op, int fromAPI = 0);
+int ink_aio_thread_num_set(int thread_num);
AIOCallback *new_AIOCallback(void);
#endif
Modified: trafficserver/traffic/trunk/proxy/InkAPI.cc
URL: http://svn.apache.org/viewvc/trafficserver/traffic/trunk/proxy/InkAPI.cc?rev=953188&r1=953187&r2=953188&view=diff
==============================================================================
--- trafficserver/traffic/trunk/proxy/InkAPI.cc (original)
+++ trafficserver/traffic/trunk/proxy/InkAPI.cc Wed Jun 9 23:24:15 2010
@@ -60,6 +60,7 @@
#include "FetchSM.h"
#include "StatSystemV2.h"
#include "HttpDebugNames.h"
+#include "I_AIO.h"
/****************************************************************
* IMPORTANT - READ ME
@@ -9130,4 +9131,89 @@ INKHttpIsInternalRequest(INKHttpTxn txnp
return vc->get_is_internal_request();
}
+
+INKReturnCode
+INKAIORead(int fd, INKU64 offset, char* buf, INKU64 buffSize, INKCont contp)
+{
+
+ if (sdk_sanity_check_iocore_structure (contp) != INK_SUCCESS) {
+ return INK_ERROR;
+ }
+
+ Continuation* pCont = (Continuation*) contp;
+ AIOCallback* pAIO = new_AIOCallback();
+ if( pAIO == NULL ) {
+ return INK_ERROR;
+ }
+
+ pAIO->aiocb.aio_fildes = fd;
+ pAIO->aiocb.aio_offset = offset;
+ pAIO->aiocb.aio_nbytes = buffSize;
+
+
+ pAIO->aiocb.aio_buf = buf;
+ pAIO->action = pCont;
+ pAIO->thread = ((ProxyMutex*) pCont->mutex)->thread_holding;
+
+ if (ink_aio_read(pAIO, 1) == 1) {
+ return INK_SUCCESS;
+ } else {
+ return INK_ERROR;
+ }
+}
+
+char*
+INKAIOBufGet(void *data)
+{
+ AIOCallback* pAIO = (AIOCallback*)data;
+ return (char*)pAIO->aiocb.aio_buf;
+}
+
+int
+INKAIONBytesGet(void *data)
+{
+ AIOCallback* pAIO = (AIOCallback*)data;
+ return (int)pAIO->aio_result;
+}
+
+INKReturnCode
+INKAIOWrite(int fd, INKU64 offset, char* buf, const INKU64 bufSize, INKCont contp)
+{
+
+ if (sdk_sanity_check_iocore_structure (contp) != INK_SUCCESS) {
+ return INK_ERROR;
+ }
+
+ Continuation* pCont = (Continuation*) contp;
+
+ AIOCallback* pAIO = new_AIOCallback();
+ if( pAIO == NULL ) {
+ return INK_ERROR;
+ }
+
+ pAIO->aiocb.aio_fildes = fd;
+ pAIO->aiocb.aio_offset = offset;
+ pAIO->aiocb.aio_buf = buf;
+ pAIO->aiocb.aio_nbytes = bufSize;
+ pAIO->action = pCont;
+ pAIO->thread = ((ProxyMutex*) pCont->mutex)->thread_holding;
+
+ if (ink_aio_write(pAIO, 1) == 1) {
+ return INK_SUCCESS;
+ } else {
+ return INK_ERROR;
+ }
+}
+
+INKReturnCode
+INKAIOThreadNumSet(int thread_num)
+{
+ if (ink_aio_thread_num_set(thread_num) == 1) {
+ return INK_SUCCESS;
+ }
+ else {
+ return INK_ERROR;
+ }
+}
+
#endif //INK_NO_API
Modified: trafficserver/traffic/trunk/proxy/api/ts/ts.h
URL: http://svn.apache.org/viewvc/trafficserver/traffic/trunk/proxy/api/ts/ts.h?rev=953188&r1=953187&r2=953188&view=diff
==============================================================================
--- trafficserver/traffic/trunk/proxy/api/ts/ts.h (original)
+++ trafficserver/traffic/trunk/proxy/api/ts/ts.h Wed Jun 9 23:24:15 2010
@@ -289,6 +289,9 @@ extern "C"
/* EVENT 1200 for internal use */
INK_EVENT_INTERNAL_1200 = 1200,
+ /* EVENT 3900 is corresponding to event AIO_EVENT_DONE defined in I_AIO.h */
+ INK_AIO_EVENT_DONE = 3900,
+
INK_EVENT_HTTP_CONTINUE = 60000,
INK_EVENT_HTTP_ERROR = 60001,
INK_EVENT_HTTP_READ_REQUEST_HDR = 60002,
@@ -2581,6 +2584,41 @@ extern "C"
*/
inkapi INKReturnCode INKTextLogObjectRollingOffsetHrSet(INKTextLogObject the_object, int rolling_offset_hr);
+ /**
+ Async disk IO read
+
+ @return INK_SUCCESS or INK_ERROR.
+ */
+ inkapi INKReturnCode INKAIORead(int fd, INKU64 offset, char* buf, INKU64 buffSize, INKCont contp);
+
+ /**
+ Async disk IO buffer get
+
+ @return char* to the buffer
+ */
+ inkapi char* INKAIOBufGet(void* data);
+
+ /**
+ Async disk IO get number of bytes
+
+ @return the number of bytes
+ */
+ inkapi int INKAIONBytesGet(void* data);
+
+ /**
+ Async disk IO write
+
+ @return INK_SUCCESS or INK_ERROR.
+ */
+ inkapi INKReturnCode INKAIOWrite(int fd, INKU64 offset, char* buf, const INKU64 bufSize, INKCont contp);
+
+ /**
+ Async disk IO set number of threads
+
+ @return INK_SUCCESS or INK_ERROR.
+ */
+ inkapi INKReturnCode INKAIOThreadNumSet(int thread_num);
+
/* --------------------------------------------------------------------------
Deprecated Functions
Use of the following functions is strongly discouraged. These