You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafficserver.apache.org by bc...@apache.org on 2010/06/10 01:24:15 UTC

svn commit: r953188 - in /trafficserver/traffic/trunk: iocore/aio/AIO.cc iocore/aio/I_AIO.h proxy/InkAPI.cc proxy/api/ts/ts.h

Author: bcall
Date: Wed Jun  9 23:24:15 2010
New Revision: 953188

URL: http://svn.apache.org/viewvc?rev=953188&view=rev
Log:
TS-387 Adds APIs for aio disk read and writes using the internal aio
support in iocore.

Author: Wendy Huang
Review and minor updates: Bryan Call

Modified:
    trafficserver/traffic/trunk/iocore/aio/AIO.cc
    trafficserver/traffic/trunk/iocore/aio/I_AIO.h
    trafficserver/traffic/trunk/proxy/InkAPI.cc
    trafficserver/traffic/trunk/proxy/api/ts/ts.h

Modified: trafficserver/traffic/trunk/iocore/aio/AIO.cc
URL: http://svn.apache.org/viewvc/trafficserver/traffic/trunk/iocore/aio/AIO.cc?rev=953188&r1=953187&r2=953188&view=diff
==============================================================================
--- trafficserver/traffic/trunk/iocore/aio/AIO.cc (original)
+++ trafficserver/traffic/trunk/iocore/aio/AIO.cc Wed Jun  9 23:24:15 2010
@@ -35,13 +35,16 @@ int ts_config_with_inkdiskio = 0;
 /* structure to hold information about each file descriptor */
 AIO_Reqs *aio_reqs[MAX_DISKS_POSSIBLE];
 /* number of unique file descriptors in the aio_reqs array */
-volatile int num_filedes = 0;
+volatile int num_filedes = 1;
 RecRawStatBlock *aio_rsb = NULL;
 // acquire this mutex before inserting a new entry in the aio_reqs array.
 // Don't need to acquire this for searching the array
 static ink_mutex insert_mutex;
 Continuation *aio_err_callbck = 0;
 RecInt cache_config_threads_per_disk = 12;
+RecInt api_config_threads_per_disk = 12;
+int thread_is_created = 0;
+
 
 // AIO Stats
 uint64 aio_num_read = 0;
@@ -111,7 +114,8 @@ AIOTestData::ink_aio_stats(int event, vo
 {
   ink_hrtime now = ink_get_hrtime();
   double time_msec = (double) (now - start) / (double) HRTIME_MSECOND;
-  for (int i = 0; i < num_filedes; i++)
+  int i = (aio_reqs[0] == NULL)? 1 : 0;
+  for (; i < num_filedes; i++) {
     printf("%0.2f\t%i\t%i\t%i\n", time_msec, aio_reqs[i]->filedes, aio_reqs[i]->pending, aio_reqs[i]->queued);
   printf("Num Requests: %i Num Queued: %i num Moved: %i\n\n", data->num_req, data->num_queue, data->num_temp);
   eventProcessor.schedule_in(this, HRTIME_MSECONDS(50), ET_CALL);
@@ -205,7 +209,7 @@ struct AIOThreadInfo:public Continuation
 
 /* insert  an entry for file descriptor fildes into aio_reqs */
 static AIO_Reqs *
-aio_init_fildes(int fildes)
+aio_init_fildes(int fildes, int fromAPI = 0)
 {
   int i;
   AIO_Reqs *request = (AIO_Reqs *) malloc(sizeof(AIO_Reqs));
@@ -217,15 +221,25 @@ aio_init_fildes(int fildes)
   ink_mutex_init(&request->aio_mutex, NULL);
   ink_atomiclist_init(&request->aio_temp_list, "temp_list", (uintptr_t) &((AIOCallback *) 0)->link);
 
-  request->index = num_filedes;
-  request->filedes = fildes;
+  RecInt thread_num;
 
-  aio_reqs[num_filedes] = request;
+  if (fromAPI) {
+    request->index = 0;
+    request->filedes = -1;
+    aio_reqs[0] = request;
+    thread_is_created = 1;
+    thread_num = api_config_threads_per_disk;
+  } else {
+    request->index = num_filedes;
+    request->filedes = fildes;
+    aio_reqs[num_filedes] = request;
+    thread_num = cache_config_threads_per_disk;
+  }
 
   /* create the main thread */
   AIOThreadInfo *thr_info;
-  for (i = 0; i < cache_config_threads_per_disk; i++) {
-    if (i == (cache_config_threads_per_disk - 1))
+  for (i = 0; i < thread_num; i++) {
+    if (i == (thread_num - 1))
       thr_info = new AIOThreadInfo(request, 1);
     else
       thr_info = new AIOThreadInfo(request, 0);
@@ -234,7 +248,9 @@ aio_init_fildes(int fildes)
 
   /* the num_filedes should be incremented after initializing everything.
      This prevents a thread from looking at uninitialized fields */
-  num_filedes++;
+  if (!fromAPI) {
+    num_filedes++;
+  }
   return request;
 }
 
@@ -296,16 +312,16 @@ aio_move(AIO_Reqs *req)
 
 /* queue the new request */
 static void
-aio_queue_req(AIOCallbackInternal *op)
+aio_queue_req(AIOCallbackInternal *op, int fromAPI = 0)
 {
-  int thread_ndx = 0;
+  int thread_ndx = 1;
   AIO_Reqs *req = op->aio_req;
   op->link.next = NULL;;
   op->link.prev = NULL;
 #ifdef AIO_STATS
   ink_atomic_increment((int *) &data->num_req, 1);
 #endif
-  if (!req || req->filedes != op->aiocb.aio_fildes) {
+  if (!fromAPI && (!req || req->filedes != op->aiocb.aio_fildes)) {
     /* search for the matching file descriptor */
     for (; thread_ndx < num_filedes; thread_ndx++) {
       if (aio_reqs[thread_ndx]->filedes == op->aiocb.aio_fildes) {
@@ -324,7 +340,7 @@ aio_queue_req(AIOCallbackInternal *op)
            aio_reqs and acquired the mutex. check the aio_reqs array to
            make sure the entry inserted does not correspond  to the current
            file descriptor */
-        for (thread_ndx = 0; thread_ndx < num_filedes; thread_ndx++) {
+        for (thread_ndx = 1; thread_ndx < num_filedes; thread_ndx++) {
           if (aio_reqs[thread_ndx]->filedes == op->aiocb.aio_fildes) {
             req = aio_reqs[thread_ndx];
             break;
@@ -337,6 +353,16 @@ aio_queue_req(AIOCallbackInternal *op)
     }
     op->aio_req = req;
   }
+  if (fromAPI && (!req || req->filedes != -1)) {
+    ink_mutex_acquire(&insert_mutex);
+    if (aio_reqs[0] == NULL) {
+      req = aio_init_fildes(-1, 1);
+    } else {
+      req = aio_reqs[0];
+    }
+    ink_mutex_release(&insert_mutex);
+    op->aio_req = req;
+  }
   ink_atomic_increment(&req->requests_queued, 1);
   if (!ink_mutex_try_acquire(&req->aio_mutex)) {
 #ifdef AIO_STATS
@@ -387,7 +413,7 @@ cache_op(AIOCallbackInternal *op)
 }
 
 int
-ink_aio_read(AIOCallback *op)
+ink_aio_read(AIOCallback *op, int fromAPI)
 {
   op->aiocb.aio_lio_opcode = LIO_READ;
   switch (AIO_MODE) {
@@ -407,14 +433,14 @@ ink_aio_read(AIOCallback *op)
     op->action.continuation->handleEvent(AIO_EVENT_DONE, op);
     break;
   case AIO_MODE_THREAD:
-    aio_queue_req((AIOCallbackInternal *) op);
+    aio_queue_req((AIOCallbackInternal *) op, fromAPI);
     break;
   }
   return 1;
 }
 
 int
-ink_aio_write(AIOCallback *op)
+ink_aio_write(AIOCallback *op, int fromAPI)
 {
   op->aiocb.aio_lio_opcode = LIO_WRITE;
   switch (AIO_MODE) {
@@ -434,12 +460,23 @@ ink_aio_write(AIOCallback *op)
     op->action.continuation->handleEvent(AIO_EVENT_DONE, op);
     break;
   case AIO_MODE_THREAD:
-    aio_queue_req((AIOCallbackInternal *) op);
+    aio_queue_req((AIOCallbackInternal *) op, fromAPI);
     break;
   }
   return 1;
 }
 
+int
+ink_aio_thread_num_set(int thread_num)
+{
+  if (thread_num > 0 && !thread_is_created) {
+    api_config_threads_per_disk = thread_num;
+    return 1;
+  } else {
+    return 0;
+  }
+}
+
 void *
 aio_thread_main(void *arg)
 {

Modified: trafficserver/traffic/trunk/iocore/aio/I_AIO.h
URL: http://svn.apache.org/viewvc/trafficserver/traffic/trunk/iocore/aio/I_AIO.h?rev=953188&r1=953187&r2=953188&view=diff
==============================================================================
--- trafficserver/traffic/trunk/iocore/aio/I_AIO.h (original)
+++ trafficserver/traffic/trunk/iocore/aio/I_AIO.h Wed Jun  9 23:24:15 2010
@@ -79,7 +79,8 @@ void ink_aio_init(ModuleVersion version)
 int ink_aio_start();
 void ink_aio_set_callback(Continuation * error_callback);
 
-int ink_aio_read(AIOCallback * op);
-int ink_aio_write(AIOCallback * op);
+int ink_aio_read(AIOCallback *op, int fromAPI = 0);   // fromAPI is a boolean to indicate if this is from a API call such as upload proxy feature
+int ink_aio_write(AIOCallback *op, int fromAPI = 0);
+int ink_aio_thread_num_set(int thread_num);
 AIOCallback *new_AIOCallback(void);
 #endif

Modified: trafficserver/traffic/trunk/proxy/InkAPI.cc
URL: http://svn.apache.org/viewvc/trafficserver/traffic/trunk/proxy/InkAPI.cc?rev=953188&r1=953187&r2=953188&view=diff
==============================================================================
--- trafficserver/traffic/trunk/proxy/InkAPI.cc (original)
+++ trafficserver/traffic/trunk/proxy/InkAPI.cc Wed Jun  9 23:24:15 2010
@@ -60,6 +60,7 @@
 #include "FetchSM.h"
 #include "StatSystemV2.h"
 #include "HttpDebugNames.h"
+#include "I_AIO.h"
 
 /****************************************************************
  *  IMPORTANT - READ ME
@@ -9130,4 +9131,89 @@ INKHttpIsInternalRequest(INKHttpTxn txnp
   return vc->get_is_internal_request();
 }
 
+
+INKReturnCode
+INKAIORead(int fd, INKU64 offset, char* buf, INKU64 buffSize, INKCont contp)
+{
+
+  if (sdk_sanity_check_iocore_structure (contp) != INK_SUCCESS) {
+    return INK_ERROR;
+  }
+
+  Continuation* pCont = (Continuation*) contp;
+  AIOCallback* pAIO = new_AIOCallback();
+  if( pAIO == NULL ) {
+    return INK_ERROR;
+  }
+
+  pAIO->aiocb.aio_fildes = fd;
+  pAIO->aiocb.aio_offset = offset;
+  pAIO->aiocb.aio_nbytes = buffSize;
+
+
+  pAIO->aiocb.aio_buf = buf;
+  pAIO->action = pCont;
+  pAIO->thread = ((ProxyMutex*) pCont->mutex)->thread_holding;
+
+  if (ink_aio_read(pAIO, 1) == 1) {
+    return INK_SUCCESS;
+  } else {
+    return INK_ERROR;
+  }
+}
+
+char*
+INKAIOBufGet(void *data)
+{
+  AIOCallback* pAIO = (AIOCallback*)data;
+  return (char*)pAIO->aiocb.aio_buf;
+}
+
+int
+INKAIONBytesGet(void *data)
+{
+  AIOCallback* pAIO = (AIOCallback*)data;
+  return (int)pAIO->aio_result;
+}
+
+INKReturnCode
+INKAIOWrite(int fd, INKU64 offset, char* buf, const INKU64 bufSize, INKCont contp)
+{
+
+  if (sdk_sanity_check_iocore_structure (contp) != INK_SUCCESS) {
+    return INK_ERROR;
+  }
+
+  Continuation* pCont = (Continuation*) contp;
+
+  AIOCallback* pAIO = new_AIOCallback();
+  if( pAIO == NULL ) {
+    return INK_ERROR;
+  }
+
+  pAIO->aiocb.aio_fildes = fd;
+  pAIO->aiocb.aio_offset = offset;
+  pAIO->aiocb.aio_buf = buf;
+  pAIO->aiocb.aio_nbytes = bufSize;
+  pAIO->action = pCont;
+  pAIO->thread = ((ProxyMutex*) pCont->mutex)->thread_holding;
+
+  if (ink_aio_write(pAIO, 1) == 1) {
+    return INK_SUCCESS;
+  } else {
+    return INK_ERROR;
+  }
+}
+
+INKReturnCode
+INKAIOThreadNumSet(int thread_num)
+{
+  if (ink_aio_thread_num_set(thread_num) == 1) {
+    return INK_SUCCESS;
+  }
+  else {
+    return INK_ERROR;
+  }
+}
+
 #endif //INK_NO_API

Modified: trafficserver/traffic/trunk/proxy/api/ts/ts.h
URL: http://svn.apache.org/viewvc/trafficserver/traffic/trunk/proxy/api/ts/ts.h?rev=953188&r1=953187&r2=953188&view=diff
==============================================================================
--- trafficserver/traffic/trunk/proxy/api/ts/ts.h (original)
+++ trafficserver/traffic/trunk/proxy/api/ts/ts.h Wed Jun  9 23:24:15 2010
@@ -289,6 +289,9 @@ extern "C"
     /* EVENT 1200 for internal use */
     INK_EVENT_INTERNAL_1200 = 1200,
 
+    /* EVENT 3900 is corresponding to event AIO_EVENT_DONE defined in I_AIO.h */
+    INK_AIO_EVENT_DONE = 3900,
+
     INK_EVENT_HTTP_CONTINUE = 60000,
     INK_EVENT_HTTP_ERROR = 60001,
     INK_EVENT_HTTP_READ_REQUEST_HDR = 60002,
@@ -2581,6 +2584,41 @@ extern "C"
    */
   inkapi INKReturnCode INKTextLogObjectRollingOffsetHrSet(INKTextLogObject the_object, int rolling_offset_hr);
 
+  /**
+      Async disk IO read
+
+      @return INK_SUCCESS or INK_ERROR.
+   */
+  inkapi INKReturnCode INKAIORead(int fd, INKU64 offset, char* buf, INKU64 buffSize, INKCont contp);
+
+  /**
+      Async disk IO buffer get
+
+      @return char* to the buffer
+   */
+  inkapi char* INKAIOBufGet(void* data);
+
+  /**
+      Async disk IO get number of bytes
+
+      @return the number of bytes
+   */
+  inkapi int INKAIONBytesGet(void* data);
+
+  /**
+      Async disk IO write
+
+      @return INK_SUCCESS or INK_ERROR.
+   */
+  inkapi INKReturnCode INKAIOWrite(int fd, INKU64 offset, char* buf, const INKU64 bufSize, INKCont contp);
+
+  /**
+      Async disk IO set number of threads
+
+      @return INK_SUCCESS or INK_ERROR.
+   */
+  inkapi INKReturnCode INKAIOThreadNumSet(int thread_num);
+
   /* --------------------------------------------------------------------------
      Deprecated Functions
      Use of the following functions is strongly discouraged. These