You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@mina.apache.org by Mike Heath <mh...@apache.org> on 2007/02/02 22:49:44 UTC

Re: Is there someone trying the AsynchronousFileIO?

I've been looking over your AIO stuff in more detail and I think part
of your performance problem might be that you're not opening the file
with O_DIRECT flag set.  I would be very interested to see the
performance difference setting this flag might make.

-Mike

On 1/17/07, yueyu lin <po...@gmail.com> wrote:
> OK, I finally implement a stable version based on the signal call in
> aio_read .
> Here is the source codes:
> The header:
> /* DO NOT EDIT THIS FILE - it is machine generated */
> #include <jni.h>
> /* Header for class com_foo_io_AsyRandomAccessFile */
>
> #ifndef _Included_com_foo_io_AsyRandomAccessFile
> #define _Included_com_foo_io_AsyRandomAccessFile
> #ifdef __cplusplus
> extern "C" {
> #endif
> /*
>  * Class:     com_foo_io_AsyRandomAccessFile
>  * Method:    read
>  * Signature: (IIJI[BLcom/foo/io/AsyReadRequest;)V
>  */
> JNIEXPORT void JNICALL Java_com_foo_io_AsyRandomAccessFile_read
>   (JNIEnv *, jclass, jint, jint, jlong, jint, jbyteArray, jobject);
>
> /*
>  * Class:     com_foo_io_AsyRandomAccessFile
>  * Method:    openFd
>  * Signature: (Ljava/lang/String;I)I
>  */
> JNIEXPORT jint JNICALL Java_com_foo_io_AsyRandomAccessFile_openFd
>   (JNIEnv *, jclass, jstring, jint);
>
> /*
>  * Class:     com_foo_io_AsyRandomAccessFile
>  * Method:    close
>  * Signature: ()V
>  */
> JNIEXPORT void JNICALL Java_com_foo_io_AsyRandomAccessFile_close
>   (JNIEnv *, jobject);
>
> #ifdef __cplusplus
> }
> #endif
> #endif
>
> The C source code:
> #include <fcntl.h>
> #include <unistd.h>
> #include <aio.h>
> #include <stdlib.h>
> #include <errno.h>
> #include <sys/stat.h>
> #include <pthread.h>
> #include "com_foo_io_AsyRandomAccessFile.h"
> #define ASYRAF_READ 0
> #define ASYRAF_WRITE 1
> #define ASYRAF_RW 2
> #define SIGNAL_CALL_BACK
> #define MAX_BUF 8092
> #define NO_LIST_DEBUG
> //The signal number to use.
> #define SIG_AIO SIGRTMIN+5
> struct _payload {
>     jobject req;
>     jbyteArray buf ;
>     jint offset;
>     struct aiocb *my_aiocb;
> };
>
> typedef struct _payload payload;
> payload empty_payload;
>
> struct _list_point{
>     payload *value;
>     struct _list_point *prev;
>     struct _list_point *next;
> };
> typedef struct _list_point list_point;
> struct _payload_list{
>     int size ;
>     list_point *head;
>     list_point *tail;
> };
> typedef struct _payload_list payload_list;
>
>
> static JavaVM *jvm;
> static jmethodID getFdMethodId ;
> static jmethodID notifyClientMethodId ;
> static jclass asyRafClass ;
>
> static jmethodID getReqFdMethodId ;
> static jmethodID getReqOffsetMethodId ;
> static jmethodID getReqPositionMethodId ;
> static jmethodID getReqLengthMethodId ;
> static jmethodID getReqBufMethodId ;
> static jclass asyReadRequestClass ;
>
> //static GAsyncQueue *taskQueue ;
>
> static int threadId;
> static struct sigaction sig_act;
> static pthread_mutex_t mutex ;
> static payload_list requests_list;
> static pthread_cond_t waitCond ;
>
>
> void add_payload(payload_list *my_list,payload *my_payload){
>     if(my_payload == NULL){
>         return ;
>     }
>     list_point *p = (list_point *)malloc(sizeof(list_point));
>     p->value = my_payload;
>     p->next = NULL;
>     pthread_mutex_lock(&mutex);
>     my_list->size ++;
>     list_point *tail = my_list->tail;
>     if(tail == NULL){
>         list_point *head = my_list->head;
>         if(head == NULL){
>             my_list->head = p;
>         }else{
>             head->next = p;
>             my_list->tail = p;
>         }
>     }else{
>         tail->next = p;
>         p->prev = tail;
>         my_list->tail = p;
>     }
>     pthread_mutex_unlock(&mutex);
>     return ;
> }
>
> payload* get_first_payload(payload_list *my_list){
>     if(my_list == NULL){
>         return NULL;
>     }
>     payload *ret ;
>     pthread_mutex_lock(&mutex);
>     list_point *head = my_list->head;
>     list_point *tail = my_list->tail;
>     if(head == NULL){
>         ret = NULL;
>     }else{
>         ret = head->value;
>         list_point *second = head->next;
>         my_list->head = second;
>         if(tail == second){
>             my_list->tail = NULL;
>         }
>         free(head);
>     }
>     if(ret != NULL){
>         my_list->size --;
>     }
>     pthread_mutex_unlock(&mutex);
>     return ret;
> }
>
> void handle_payload(JNIEnv *env,payload *my_payload){
>         struct aiocb *my_aiocb = my_payload->my_aiocb;
>         if(aio_error(my_aiocb) == 0){
>             //notify the data comes
>             aio_return(my_aiocb);
>             jbyteArray buf = my_payload->buf;
>             jbyte* nativeBytes = my_aiocb->aio_buf;
>             jobject req = my_payload->req;
>             (*env)->SetByteArrayRegion(env, buf, my_payload->offset,
> my_aiocb->aio_nbytes, (jbyte *)nativeBytes);
>             (*env)->CallVoidMethod(env,req,notifyClientMethodId,req);
>             (*env)->DeleteGlobalRef(env,req);
>             free(nativeBytes);
>             free(my_aiocb);
>             free(my_payload);
>         }else{
>             //Also should notify error
>             perror("no data!");
>         }
> }
>
> //This method is used by a thread to send the notification to JVM's thread
> void* startEntry(void* data){
>     JNIEnv *env;
>     int resId = (*jvm)->AttachCurrentThread(jvm,(void **)&env,NULL);
>     if(resId < 0){
>         fprintf(stderr,"The native thread cannot attach to JVM thread\n");
>         return NULL;
>     }
>     while(1){
>         payload *my_payload = NULL;
>         while( (my_payload = get_first_payload(&requests_list)) != NULL){
>             handle_payload(env,my_payload);
>         }
>         if(requests_list.size > 0){
>             continue ;
>         }
>         pthread_mutex_lock(&mutex);
>         if(requests_list.size <= 0){
>             pthread_cond_wait(&waitCond,&mutex);
>         }
>         pthread_mutex_unlock(&mutex);
> //        payload *my_payload = (payload*)(g_async_queue_pop(taskQueue));
> //        //printf("Receive %ld\n",recCnt);
> //        handle_payload(env,my_payload);
>     }
>     (*jvm)->DetachCurrentThread(jvm);
>     return NULL;
> }
>
> //This is the signal handler
> void finishReadHandler( int signo, siginfo_t *info, void *context ){
>   /* Ensure it's our signal */
>   if (info->si_signo == SIG_AIO) {
>           //g_async_queue_push(taskQueue,info->si_value.sival_ptr);
>           add_payload(&requests_list,info->si_value.sival_ptr);
>           pthread_mutex_lock(&mutex);
>           pthread_cond_broadcast(&waitCond);
>           pthread_mutex_unlock(&mutex);
>
>           /*static int cnt = 0;
>           cnt ++;
>           printf("receive signal %ld\n",cnt);*/
>       }
> }
>
> //This is the thread call back
> void finishRead(sigval_t sig){
>     payload *my_payload = (payload*)sig.sival_ptr;
>     struct aiocb *my_aiocb = my_payload->my_aiocb;
> /*    if(aio_error(my_aiocb) != 0){
>         perror("finish reading err");
>         return ;
>     }*/
>     JNIEnv *env;
>     int resId = (*jvm)->AttachCurrentThread(jvm,(void **)&env,NULL);
>     if(resId < 0){
>         fprintf(stderr,"Cannot attach to JVM thread\n");
>         return ;
>     }
>
>     jobject req = my_payload->req;
>     jbyteArray buf = my_payload->buf;
>     jbyte* nativeBytes = my_aiocb->aio_buf;
>     (*env)->SetByteArrayRegion(env, buf, my_payload->offset,
> my_aiocb->aio_nbytes, (jbyte *)nativeBytes);
>     (*env)->CallVoidMethod(env,req,notifyClientMethodId,req);
>     (*env)->DeleteGlobalRef(env,req);
>     (*jvm)->DetachCurrentThread(jvm);
>
>     free(nativeBytes);
>     free(my_aiocb);
>     free(my_payload);
> }
>
> void printMyaiocb(struct aiocb *myaiocb){
>     printf("fd is %d;offset is %d;length is
> %d\n",myaiocb->aio_fildes,myaiocb->aio_offset,myaiocb->aio_nbytes);
> }
> /*
>  * Class:     com_foo_io_AsyRandomAccessFile
>  * Method:    read
>  * Signature: (IIJILjava/nio/ByteBuffer;Lcom/telenav/io/AsyReadRequest;)V
>  */
> JNIEXPORT void JNICALL Java_com_foo_io_AsyRandomAccessFile_read
>   (JNIEnv *env, jclass klass, jint fd, jint offset, jlong position, jint
> length, jbyteArray byteBuf, jobject req){
>      struct aiocb *my_aiocb = (struct aiocb*)malloc(sizeof(struct aiocb));
>      payload *my_payload = (payload*)malloc(sizeof(payload));
>
>      bzero(my_aiocb,sizeof(struct aiocb));
>      bzero(my_payload,sizeof(payload));
>
>      my_aiocb->aio_fildes = fd;
>      my_aiocb->aio_offset = position;
>      my_aiocb->aio_nbytes = length;
>      my_aiocb->aio_buf = malloc(sizeof(char)*(length+1));
>      my_payload->offset = offset;
>      my_payload->my_aiocb = my_aiocb ;
>      my_payload->req = (*env)->NewGlobalRef(env,req);
>      my_payload->buf = (*env)->NewGlobalRef(env,byteBuf);
>
> #ifndef SIGNAL_CALL_BACK
>      my_aiocb->aio_sigevent.sigev_notify = SIGEV_THREAD;
>      my_aiocb->aio_sigevent.sigev_notify_function = finishRead;
>      my_aiocb->aio_sigevent.sigev_notify_attributes = NULL;
>      my_aiocb->aio_sigevent.sigev_value.sival_ptr = my_payload;
> #endif
>
> #ifdef SIGNAL_CALL_BACK
>     my_aiocb->aio_sigevent.sigev_notify = SIGEV_SIGNAL;
>     my_aiocb->aio_sigevent.sigev_signo = SIG_AIO;
>     my_aiocb->aio_sigevent.sigev_value.sival_ptr = my_payload;
> #endif
>      if(aio_read(my_aiocb) < 0){
>          perror("aio_reading");
>      }
>  }
>
> /*
>  * Class:     com_foo_io_AsyRandomAccessFile
>  * Method:    openFd
>  * Signature: (Ljava/lang/String;I)I
>  */
> JNIEXPORT jint JNICALL Java_com_foo_io_AsyRandomAccessFile_openFd
>   (JNIEnv *env, jobject raf, jstring fileName, jint acc){
>       int accFlags = O_RDONLY;
>       switch(acc){
>               case ASYRAF_READ:
>                   accFlags = O_RDONLY;
>               break;
>               case ASYRAF_WRITE:
>                   accFlags = O_WRONLY;
>               break;
>               case ASYRAF_RW:
>                   accFlags = O_RDWR;
>               break;
>           }
>       const jchar* charStr = (*env)->GetStringUTFChars(env,fileName,NULL);
>       int fd = open(charStr,accFlags);
>       (*env)->ReleaseStringChars(env,fileName,charStr);
>       return fd;
>   }
> /*
>  * Class:     com_foo_io_AsyRandomAccessFile
>  * Method:    close
>  * Signature: ()V
>  */
> JNIEXPORT void JNICALL Java_com_foo_io_AsyRandomAccessFile_close
>   (JNIEnv *env, jobject raf){
>      jint fd = (*env)->CallIntMethod(env,raf,getFdMethodId);
>      close(fd);
>  }
>
>  //Does not support the previous version
> jint JNI_OnLoad(JavaVM *vm, void *reserved){
>     JNIEnv *env ;
>     jvm = vm ;
>     int resId = (*vm)->AttachCurrentThread(vm,(void **)&env,NULL);
>     if(resId < 0){
>         fprintf(stderr,"Cannot attach to JVM thread\n");
>         return JNI_VERSION_1_4;
>     }
>     jclass klass =
> (*env)->FindClass(env,"com/telenav/io/AsyRandomAccessFile");
>     asyRafClass = (*env)->NewGlobalRef(env,klass);
>     getFdMethodId = (*env)->GetMethodID(env,asyRafClass,"getFd","()I");
>     klass = (*env)->FindClass(env,"com/telenav/io/AsyReadRequest");
>     asyReadRequestClass = (*env)->NewGlobalRef(env,klass);
>     getReqFdMethodId =
> (*env)->GetMethodID(env,asyReadRequestClass,"getFd","()I");
>     getReqOffsetMethodId =
> (*env)->GetMethodID(env,asyReadRequestClass,"getOffset","()I");
>     getReqPositionMethodId =
> (*env)->GetMethodID(env,asyReadRequestClass,"getPosition","()J");
>     getReqBufMethodId =
> (*env)->GetMethodID(env,asyReadRequestClass,"getBs","()[B");
>     getReqLengthMethodId =
> (*env)->GetMethodID(env,asyReadRequestClass,"getLength","()I");
>     notifyClientMethodId =
> (*env)->GetMethodID(env,asyReadRequestClass,"notifyClient","()V");
>     (*jvm)->DetachCurrentThread(jvm);
> #ifdef SIGNAL_CALL_BACK
>     requests_list.head = NULL;
>     requests_list.tail = NULL;
>     requests_list.size = 0;
>     pthread_cond_init(&waitCond,NULL);
>     int ret = pthread_create(&threadId,NULL,startEntry,NULL);
>     if(ret != 0){
>         perror("Create the thread's error!");
>     }
>     sigemptyset(&sig_act.sa_mask);
>     sig_act.sa_flags = SA_SIGINFO;
>     sig_act.sa_sigaction = finishReadHandler;
>     ret = sigaction(  SIG_AIO, &sig_act, NULL );
>     if(ret != 0){
>         perror("Hook signal error!");
>     }
>     pthread_mutex_init(&mutex,NULL);
> #endif
>     printf("ok for jni load\n");
>     return JNI_VERSION_1_4;
> }
>
> void JNI_OnUnload(JavaVM *vm, void *reserved){
>     JNIEnv *env ;
>     jvm = vm ;
>     int resId = (*vm)->AttachCurrentThread(vm,(void **)&env,NULL);
>     if(resId < 0){
>         fprintf(stderr,"Cannot attach to JVM thread\n");
>         return ;
>     }
> #ifdef SIGNAL_CALL_BACK
>     //Terminate the thread,implement it later
> #endif
>     (*env)->DeleteGlobalRef(env,asyRafClass);
>     (*jvm)->DetachCurrentThread(jvm);
> }
>
> The performance is improved about 70%~100% compared with the thread
> notification way.
> But sadly, it's still slower than Sun's simple implementation. I've checked
> the codes, it just only call read(2) for any read invokation. No special
> operations.
> Mentioned by the mail from Sun's employee in OpenJDK, the aio_read is
> aio_read(3)
> instead of aio_read(2). That means that's a lib call, not a syscall. The
> syscall may benefit from the legacy interrupt to improve performance.  I
> also noted that Linux has already put the aio_read implemented in the kernel
> in the Kernel 2.5 or later. But it's still a software way to do that.
> Indeed, the asynchronous read may improve the throughput, but it needs
> current application to change their model to event-driven model. My idea
> that is to replace the RandomAccessFile may not work. Because our
> application is just work as a block reading way.
> I don't know if there is any successful large scale system that is using the
> event-driven model especially in large file system's management.(Google I
> guess?)
> I have no idea that if the experiment should continue. Any of your comments
> are welcome.
> Thanks.
>
>
> On 1/16/07, yueyu lin <po...@gmail.com> wrote:
> >
> > I feel uncomfortable to send mails in the maillist since I didn't ever use
> > Mina. But I think this issue can be a pure technical problem to discuss that
> > may help us to achieve a better performance.
> >
> > First, I'll describe how I tested the performance.
> >
> >
> > Test data: File A larger than 1.2 Giga bytes. (Text file)
> > Test cases: File B to describe the test cases. (Text file)
> >          The format looks like:
> >               offset
> >               (1024 bytes)
> >               ...
> >          This test cases file is generated by a program. It will randomly
> > find an offset in File A and record the offset to File B, retrieve 1024
> > bytes from the offset and write these bytes to the File B. That will help to
> > verify the multiple threads program can get the correct results.
> >
> > I have a DataCenter class is to read the test cases from File B and give
> > them to the caller.
> > public class DataCeneter {
> >
> >     RandomAccessFile  raf = null;
> >
> >     public DataCeneter(String fileName){
> >         try {
> >             raf = new RandomAccessFile(fileName,"r");
> >         } catch (FileNotFoundException e) {
> >             e.printStackTrace();
> >         }
> >     }
> >
> >     public synchronized String[] readLines(){
> >         if(raf == null){
> >             return null;
> >         }
> >         try {
> >             String[] ret = new String[2];
> >             ret[0] = raf.readLine();
> >             byte[] bs = new byte[1024];
> >             raf.readFully(bs);
> >             raf.read();
> >             ret[1] = new String(bs);
> >             return ret;
> >         } catch (IOException e) {
> >             return null;
> >         }
> >     }
> >
> > }
> >
> >
> > The test program willonly ask the threads to call the readLines function
> > to get the test data. The String[0] is the offset and String[1] is the
> > value. An abstract test Class make the test job easier and fare to very
> > testers. Different file reader implementation only needs to implement read(int
> > position) and init(String fileName) function. Other jobs like comparing
> > results will be the same in every kinds of implementation.
> >
> > public abstract class FileReaderTest {
> >
> >     int threadNum = 3;
> >     DataCeneter dc = null;
> >     int runningThreads ;
> >     Object mutex = new Object();
> >     int amount = 0;
> >
> >     public FileReaderTest(int threadNum,String fileName,String
> > srcFileName){
> >         this.threadNum = threadNum;
> >         dc = new DataCeneter(fileName);
> >         runningThreads = threadNum;
> >         init(srcFileName);
> >     }
> >
> >     public abstract void init(String srcFileName) ;
> >
> >     public abstract String read(int position);
> >
> >     public void runTest(){
> >         for(int i = 0;i < threadNum;i ++){
> >             new Thread(new Runnable() {
> >                 public void run() {
> >                     String[] lines = null;
> >                     while( (lines = dc.readLines()) != null ){
> >                         String result = read(Integer.parseInt(lines[0]));
> >                         if(!lines[1].equals(result)){
> >                             System.err.println("Wrong output~");
> >                         }
> >                         amount ++;
> >                     }
> >                     synchronized (mutex) {
> >                         runningThreads --;
> >                         mutex.notifyAll();
> >                     }
> >                 }
> >             }).start();
> >         }
> >
> >         while(runningThreads > 0){
> >             synchronized (mutex) {
> >                 try {
> >                     mutex.wait();
> >                 } catch (InterruptedException e) {
> >                     // TODO Auto-generated catch block
> >                     e.printStackTrace();
> >                 }
> >                 if(runningThreads <= 0)
> >                     break;
> >             }
> >         }
> >
> >         System.out.println("Finished:"+amount);
> >     }
> > }
> >
> >
> > Then let's see the RandomAccessFileTest implementation.
> >
> > public class RafReaderTest extends FileReaderTest {
> >
> >     RandomAccessFile raf ;
> >
> >     public RafReaderTest(int threadNum, String fileName, String
> > srcFileName) {
> >         super(threadNum, fileName, srcFileName);
> >
> >     }
> >
> >
> >
> >     public void init(String srcFileName) {
> >         try {
> >             raf = new RandomAccessFile(srcFileName,"r");
> >         } catch (FileNotFoundException e) {
> >             e.printStackTrace();
> >             System.exit(-1);
> >         }
> >     }
> >
> >     public String read(int position) {
> >         byte[] bs = new byte[1024];
> >         int read = 0;
> >         int offset = 0;
> >         int length = 1024;
> >         synchronized(raf){
> >             try {
> >                 raf.seek(position);
> >                 while(read != 1024){
> >                         int r = raf.read(bs, offset, length);
> >                         offset += r;
> >                         length -= r;
> >                         read += r;
> >                 }
> >             }catch (IOException e) {
> >                 e.printStackTrace();
> >             }
> >         }
> >         return new String(bs);
> >     }
> >
> >     public static void main(String[] args){
> >         RafReaderTest rrt = new
> > RafReaderTest(5,"/home/yueyulin/access.txt","/home/yueyulin/hugeFile.txt");
> >         long start = System.currentTimeMillis();
> >         rrt.runTest();
> >         long end = System.currentTimeMillis ();
> >         System.out.println("Raf read time is "+(end-start));
> >     }
> >
> > }
> >
> > If only open one file descriptor, everytime we have to lock the codes to
> > prevent from changing the offset.
> > I have implemented another class to use Linux aio_read to read data. The
> > file reader test class is
> > public class AsyReaderTest extends FileReaderTest {
> >
> >     private AsyRandomAccessFile araf ;
> >
> >     public AsyReaderTest(int threadNum, String fileName, String
> > srcFileName) {
> >         super(threadNum, fileName, srcFileName);
> >     }
> >
> >     @Override
> >     public void init(String srcFileName) {
> >         // TODO Auto-generated method stub
> >         araf = new AsyRandomAccessFile(srcFileName,"r");
> >     }
> >
> >     @Override
> >     public String read(int position) {
> >         byte[] bs = new byte[1024];
> >         int offset = 0;
> >         int length = 1024;
> >         araf.read(bs, offset, position, length);
> >         return new String(bs);
> >     }
> >
> >     public static void main(String[] args){
> >         AsyReaderTest rrt = new
> > AsyReaderTest(5,"/home/yueyulin/access.txt","/home/yueyulin/hugeFile.txt");
> >         long start = System.currentTimeMillis();
> >         rrt.runTest();
> >         long end = System.currentTimeMillis ();
> >         System.out.println("Araf read time is "+(end-start));
> >     }
> >
> > }
> >
> >
> > I generated a file named access.txt to contain the test cases. It contains
> > 20000 cases about 20 M or so.
> >
> > The results are different in different situation.
> >
> >
> >    1. When I run these two test programs first time. The Asynchronous
> >    version is much better than the RandomAccessFile version. The detailed
> >    results is :
> >
> >
> >    - RandomAccessFileTest: 20000 cases, time: 120 seconds
> >    - AsyRandomAccessFileTest: 20000 cases, time: 70 seconds
> >
> >         2 . You will find the speed is slow. That is explainable because
> > the system starts up and everything is not in cache. Linux will cache the
> > most useful data into
> >              memory. So the next time when I run the tests, the
> > performance is much better.
> >
> >    - RandomAccessFileTest: 20000 cases, time: 1~1.9 seconds
> >    - AsyRandomAccessFileTest: 20000 cases, time 4~4.5 seconds.
> >
> >         Although performance is much better than the initial running, the
> > results are not what we expected. The asynchronous version is even slower
> > than the synchronous version in multiple threads environment. (I will send
> > out the detailed implementation of JNI in the later discussion).
> >         I was also impressed by the RandomAccessFile's good performance.
> > So I went into the Java's sources to find why.
> >         I have a copy of Java's source codes. I don't want to describe how
> > to find the place in bunch of codes. I just pick up the related source codes
> > out.
> >         In the $JAVA_SOURCE/j2se/src/share/native/java/io directory, there
> > is a file named "io_util.c" describes how the read does.
> > /* The maximum size of a stack-allocated buffer.
> >  */
> > #define BUF_SIZE 8192
> >
> >
> > int
> > readBytes(JNIEnv *env, jobject this, jbyteArray bytes,
> >       jint off, jint len, jfieldID fid)
> > {
> >     int nread, datalen;
> >     char stackBuf[BUF_SIZE];
> >     char *buf = 0;
> >     FD fd;
> >
> >     if (IS_NULL(bytes)) {
> >     JNU_ThrowNullPointerException(env, 0);
> >     return -1;
> >     }
> >     datalen = (*env)->GetArrayLength(env, bytes);
> >
> >     if ((off < 0) || (off > datalen) ||
> >         (len < 0) || ((off + len) > datalen) || ((off + len) < 0)) {
> >         JNU_ThrowByName(env, "java/lang/IndexOutOfBoundsException", 0);
> >     return -1;
> >     }
> >
> >     if (len == 0) {
> >     return 0;
> >     } else if (len > BUF_SIZE) {
> >         buf = malloc(len);
> >     if (buf == 0) {
> >         JNU_ThrowOutOfMemoryError(env, 0);
> >         return 0;
> >         }
> >     } else {
> >         buf = stackBuf;
> >     }
> >
> >     fd = GET_FD(this, fid);
> >     nread = IO_Read(fd, buf, len);
> >     if (nread > 0) {
> >         (*env)->SetByteArrayRegion(env, bytes, off, nread, (jbyte *)buf);
> >     } else if (nread == JVM_IO_ERR) {
> >     JNU_ThrowIOExceptionWithLastError(env, "Read error");
> >     } else if (nread == JVM_IO_INTR) { /* EOF */
> >         JNU_ThrowByName(env, "java/io/InterruptedIOException", 0);
> >     } else { /* EOF */
> >     nread = -1;
> >     }
> >
> >     if (buf != stackBuf) {
> >         free(buf);
> >     }
> >     return nread;
> > }
> >
> > The codes above don't contain a lot of tricky codes. The only well-known
> > technical is to use a stack allocation( char stackBuf[BUF_SIZE];) for
> > small memory allocation instead of asking for memory allocation from heap(char
> > *buf = 0;) every time.  But for aio_read, we have no way to use the stack
> > memory because a call back function must be used. And I even don't think the
> > small codes are the key point.
> > Then I found Sun's implementation using IO_Read(fd, buf, len) to do the
> > real read operation. This macro is defined in
> > $JAVA_SOURCE/j2se/src/solaris/native/java/io/io_util_md.h.
> > #define IO_Read JVM_Read
> >
> > But when I want to look into the JVM_Read codes, I found nothing. I only
> > found its declaration in jvm.h. In  VM.c, I still found nothing valuable.
> > I guess there are should some key codes in the closed(or missing?) codes.
> >
> > My next plan is to subscribe the openJDK's mail list to ask for the codes
> > of JVM_Read implementation, at lease implementation in POSIX system.
> >
> > Thanks for reading the long and boring mail.  In fact, I think when we
> > limit Java's cross-platform ability, Java can do a lot of things elegantly.
> > But Sun's engineers have done a lot more than what I thought. I wish we
> > would do something to bring the aio really to Java.
> >
> > On 1/16/07, Mike Heath <mh...@apache.org> wrote:
> > >
> > > The Asynchronous File I/O code is still in an experimental state.  There
> > > are some known performance issues and I'm working on ways to overcome
> > > them.  I am actively working on the library and I appreciate your
> > > feedback.
> > >
> > > Depending on your situation, you may be able to do all the file
> > > processing in a separate thread using Java 5 concurrent library to get
> > > asynchronous I/O without the pains of using JNI.
> > >
> > > Within the next few weeks, I plan to release some code that does
> > > asynchronous file I/O through a separate thread.  This will work on all
> > > platforms and not just Linux.  It will also give me a baseline that can
> > > be used for performance testing the use of POSIX AIO calls.  Within the
> > > next few months I plan to do a lot of performance testing and tweaking.
> > > Any help with this would be appreciated.
> > >
> > > If you have any additional questions, concerns or other feedback, please
> > > let me know.
> > >
> > > -Mike
> > >
> > > On Mon, 2007-01-15 at 18:14 +0800, yueyu lin wrote:
> > > > Hi, everyone,
> > > >    I found in the mina contributors, there is an asynchronousFileIO
> > > that is
> > > > using aio_read in linux.
> > > >    The URL is http://mina.apache.org/asynchronous-file-io-in-java.html
> > > >
> > > >   It happens that I decide to try to replace the RandomAccessFile in
> > > my
> > > > project. Our system is a large and busy system based on huge files in
> > > Linux.
> > > > When profiling, I found that a lot of blocking/waiting in file IO on
> > > the
> > > > same file descriptor. When there is some operation asking for a lot of
> > > IO
> > > > requests(it's quiet often in our system), other threads will be
> > > affected.
> > > >   So I just finished an experimental modification that invokes
> > > aio_read in
> > > > Linux in JNI. Since our system is running only in Linux, so it will be
> > > good
> > > > for us.
> > > >   But when I tested the performance, the results depressed me. The
> > > > performance is three times lower than the RandomAccessFile in 5
> > > threads.
> > > >
> > > >   I search the internet to ses if there's any other one to try that
> > > like
> > > > what I did. Then I found it in the mina contributors.   The codes are
> > > almost
> > > > the same with what I did. The difference is just that I'm using C and
> > > the
> > > > contributors' codes are using c++.
> > > >   I want to know if you tested the performance and compared with the
> > > sun's
> > > > RandomAccessFile? What's the result?
> > > >   I also looked into sun's native method opened some time ago. The
> > > > j2se/share/native/java/io/io_util.c has the detailed codes. It doesn't
> > > have
> > > > any tricky codes. The only one is that it's using a stack variable
> > > byte
> > > > array if the read length is less than 8K.  But in the aio_read, it
> > > doesn't
> > > > work. Because we have to allocate new  memory to contain the results
> > > or
> > > > using the byte array passed into the native codes. It has to be in the
> > >
> > > > heap.  Even so, I don't think it will affect so much .
> > > >   Thanks in advance.
> > > >   I also want to know how to contact Mike
> > > > Heath<http://cwiki.apache.org/confluence/users/viewuserprofile.action?username=mheath
> > > >.
> > > > He contributes the codes.
> > > >
> > > >   BTW: I'm even using the same development environment with Mike
> > > Heath.
> > > > (Ubuntu 6.10).
> > >
> > >
> >
> >
> > --
> > --
> > Yueyu Lin
>
>
>
>
> --
> --
> Yueyu Lin
>
>

Re: Is there someone trying the AsynchronousFileIO?

Posted by yueyu lin <po...@gmail.com>.
I guess O_DIRECT will hurt the performance because it cannot use linux cache
well.
Anyway, I will try the option to see if it will be better.

On 2/3/07, Mike Heath <mh...@apache.org> wrote:
>
> I've been looking over your AIO stuff in more detail and I think part
> of your performance problem might be that you're not opening the file
> with O_DIRECT flag set.  I would be very interested to see the
> performance difference setting this flag might make.
>
> -Mike
>
> On 1/17/07, yueyu lin <po...@gmail.com> wrote:
> > OK, I finally implement a stable version based on the signal call in
> > aio_read .
> > Here is the source codes:
> > The header:
> > /* DO NOT EDIT THIS FILE - it is machine generated */
> > #include <jni.h>
> > /* Header for class com_foo_io_AsyRandomAccessFile */
> >
> > #ifndef _Included_com_foo_io_AsyRandomAccessFile
> > #define _Included_com_foo_io_AsyRandomAccessFile
> > #ifdef __cplusplus
> > extern "C" {
> > #endif
> > /*
> >  * Class:     com_foo_io_AsyRandomAccessFile
> >  * Method:    read
> >  * Signature: (IIJI[BLcom/foo/io/AsyReadRequest;)V
> >  */
> > JNIEXPORT void JNICALL Java_com_foo_io_AsyRandomAccessFile_read
> >   (JNIEnv *, jclass, jint, jint, jlong, jint, jbyteArray, jobject);
> >
> > /*
> >  * Class:     com_foo_io_AsyRandomAccessFile
> >  * Method:    openFd
> >  * Signature: (Ljava/lang/String;I)I
> >  */
> > JNIEXPORT jint JNICALL Java_com_foo_io_AsyRandomAccessFile_openFd
> >   (JNIEnv *, jclass, jstring, jint);
> >
> > /*
> >  * Class:     com_foo_io_AsyRandomAccessFile
> >  * Method:    close
> >  * Signature: ()V
> >  */
> > JNIEXPORT void JNICALL Java_com_foo_io_AsyRandomAccessFile_close
> >   (JNIEnv *, jobject);
> >
> > #ifdef __cplusplus
> > }
> > #endif
> > #endif
> >
> > The C source code:
> > #include <fcntl.h>
> > #include <unistd.h>
> > #include <aio.h>
> > #include <stdlib.h>
> > #include <errno.h>
> > #include <sys/stat.h>
> > #include <pthread.h>
> > #include "com_foo_io_AsyRandomAccessFile.h"
> > #define ASYRAF_READ 0
> > #define ASYRAF_WRITE 1
> > #define ASYRAF_RW 2
> > #define SIGNAL_CALL_BACK
> > #define MAX_BUF 8092
> > #define NO_LIST_DEBUG
> > //The signal number to use.
> > #define SIG_AIO SIGRTMIN+5
> > struct _payload {
> >     jobject req;
> >     jbyteArray buf ;
> >     jint offset;
> >     struct aiocb *my_aiocb;
> > };
> >
> > typedef struct _payload payload;
> > payload empty_payload;
> >
> > struct _list_point{
> >     payload *value;
> >     struct _list_point *prev;
> >     struct _list_point *next;
> > };
> > typedef struct _list_point list_point;
> > struct _payload_list{
> >     int size ;
> >     list_point *head;
> >     list_point *tail;
> > };
> > typedef struct _payload_list payload_list;
> >
> >
> > static JavaVM *jvm;
> > static jmethodID getFdMethodId ;
> > static jmethodID notifyClientMethodId ;
> > static jclass asyRafClass ;
> >
> > static jmethodID getReqFdMethodId ;
> > static jmethodID getReqOffsetMethodId ;
> > static jmethodID getReqPositionMethodId ;
> > static jmethodID getReqLengthMethodId ;
> > static jmethodID getReqBufMethodId ;
> > static jclass asyReadRequestClass ;
> >
> > //static GAsyncQueue *taskQueue ;
> >
> > static int threadId;
> > static struct sigaction sig_act;
> > static pthread_mutex_t mutex ;
> > static payload_list requests_list;
> > static pthread_cond_t waitCond ;
> >
> >
> > void add_payload(payload_list *my_list,payload *my_payload){
> >     if(my_payload == NULL){
> >         return ;
> >     }
> >     list_point *p = (list_point *)malloc(sizeof(list_point));
> >     p->value = my_payload;
> >     p->next = NULL;
> >     pthread_mutex_lock(&mutex);
> >     my_list->size ++;
> >     list_point *tail = my_list->tail;
> >     if(tail == NULL){
> >         list_point *head = my_list->head;
> >         if(head == NULL){
> >             my_list->head = p;
> >         }else{
> >             head->next = p;
> >             my_list->tail = p;
> >         }
> >     }else{
> >         tail->next = p;
> >         p->prev = tail;
> >         my_list->tail = p;
> >     }
> >     pthread_mutex_unlock(&mutex);
> >     return ;
> > }
> >
> > payload* get_first_payload(payload_list *my_list){
> >     if(my_list == NULL){
> >         return NULL;
> >     }
> >     payload *ret ;
> >     pthread_mutex_lock(&mutex);
> >     list_point *head = my_list->head;
> >     list_point *tail = my_list->tail;
> >     if(head == NULL){
> >         ret = NULL;
> >     }else{
> >         ret = head->value;
> >         list_point *second = head->next;
> >         my_list->head = second;
> >         if(tail == second){
> >             my_list->tail = NULL;
> >         }
> >         free(head);
> >     }
> >     if(ret != NULL){
> >         my_list->size --;
> >     }
> >     pthread_mutex_unlock(&mutex);
> >     return ret;
> > }
> >
> > void handle_payload(JNIEnv *env,payload *my_payload){
> >         struct aiocb *my_aiocb = my_payload->my_aiocb;
> >         if(aio_error(my_aiocb) == 0){
> >             //notify the data comes
> >             aio_return(my_aiocb);
> >             jbyteArray buf = my_payload->buf;
> >             jbyte* nativeBytes = my_aiocb->aio_buf;
> >             jobject req = my_payload->req;
> >             (*env)->SetByteArrayRegion(env, buf, my_payload->offset,
> > my_aiocb->aio_nbytes, (jbyte *)nativeBytes);
> >             (*env)->CallVoidMethod(env,req,notifyClientMethodId,req);
> >             (*env)->DeleteGlobalRef(env,req);
> >             free(nativeBytes);
> >             free(my_aiocb);
> >             free(my_payload);
> >         }else{
> >             //Also should notify error
> >             perror("no data!");
> >         }
> > }
> >
> > //This method is used by a thread to send the notification to JVM's
> thread
> > void* startEntry(void* data){
> >     JNIEnv *env;
> >     int resId = (*jvm)->AttachCurrentThread(jvm,(void **)&env,NULL);
> >     if(resId < 0){
> >         fprintf(stderr,"The native thread cannot attach to JVM
> thread\n");
> >         return NULL;
> >     }
> >     while(1){
> >         payload *my_payload = NULL;
> >         while( (my_payload = get_first_payload(&requests_list)) !=
> NULL){
> >             handle_payload(env,my_payload);
> >         }
> >         if(requests_list.size > 0){
> >             continue ;
> >         }
> >         pthread_mutex_lock(&mutex);
> >         if(requests_list.size <= 0){
> >             pthread_cond_wait(&waitCond,&mutex);
> >         }
> >         pthread_mutex_unlock(&mutex);
> > //        payload *my_payload =
> (payload*)(g_async_queue_pop(taskQueue));
> > //        //printf("Receive %ld\n",recCnt);
> > //        handle_payload(env,my_payload);
> >     }
> >     (*jvm)->DetachCurrentThread(jvm);
> >     return NULL;
> > }
> >
> > //This is the signal handler
> > void finishReadHandler( int signo, siginfo_t *info, void *context ){
> >   /* Ensure it's our signal */
> >   if (info->si_signo == SIG_AIO) {
> >           //g_async_queue_push(taskQueue,info->si_value.sival_ptr);
> >           add_payload(&requests_list,info->si_value.sival_ptr);
> >           pthread_mutex_lock(&mutex);
> >           pthread_cond_broadcast(&waitCond);
> >           pthread_mutex_unlock(&mutex);
> >
> >           /*static int cnt = 0;
> >           cnt ++;
> >           printf("receive signal %ld\n",cnt);*/
> >       }
> > }
> >
> > //This is the thread call back
> > void finishRead(sigval_t sig){
> >     payload *my_payload = (payload*)sig.sival_ptr;
> >     struct aiocb *my_aiocb = my_payload->my_aiocb;
> > /*    if(aio_error(my_aiocb) != 0){
> >         perror("finish reading err");
> >         return ;
> >     }*/
> >     JNIEnv *env;
> >     int resId = (*jvm)->AttachCurrentThread(jvm,(void **)&env,NULL);
> >     if(resId < 0){
> >         fprintf(stderr,"Cannot attach to JVM thread\n");
> >         return ;
> >     }
> >
> >     jobject req = my_payload->req;
> >     jbyteArray buf = my_payload->buf;
> >     jbyte* nativeBytes = my_aiocb->aio_buf;
> >     (*env)->SetByteArrayRegion(env, buf, my_payload->offset,
> > my_aiocb->aio_nbytes, (jbyte *)nativeBytes);
> >     (*env)->CallVoidMethod(env,req,notifyClientMethodId,req);
> >     (*env)->DeleteGlobalRef(env,req);
> >     (*jvm)->DetachCurrentThread(jvm);
> >
> >     free(nativeBytes);
> >     free(my_aiocb);
> >     free(my_payload);
> > }
> >
> > void printMyaiocb(struct aiocb *myaiocb){
> >     printf("fd is %d;offset is %d;length is
> > %d\n",myaiocb->aio_fildes,myaiocb->aio_offset,myaiocb->aio_nbytes);
> > }
> > /*
> >  * Class:     com_foo_io_AsyRandomAccessFile
> >  * Method:    read
> >  * Signature:
> (IIJILjava/nio/ByteBuffer;Lcom/telenav/io/AsyReadRequest;)V
> >  */
> > JNIEXPORT void JNICALL Java_com_foo_io_AsyRandomAccessFile_read
> >   (JNIEnv *env, jclass klass, jint fd, jint offset, jlong position, jint
> > length, jbyteArray byteBuf, jobject req){
> >      struct aiocb *my_aiocb = (struct aiocb*)malloc(sizeof(struct
> aiocb));
> >      payload *my_payload = (payload*)malloc(sizeof(payload));
> >
> >      bzero(my_aiocb,sizeof(struct aiocb));
> >      bzero(my_payload,sizeof(payload));
> >
> >      my_aiocb->aio_fildes = fd;
> >      my_aiocb->aio_offset = position;
> >      my_aiocb->aio_nbytes = length;
> >      my_aiocb->aio_buf = malloc(sizeof(char)*(length+1));
> >      my_payload->offset = offset;
> >      my_payload->my_aiocb = my_aiocb ;
> >      my_payload->req = (*env)->NewGlobalRef(env,req);
> >      my_payload->buf = (*env)->NewGlobalRef(env,byteBuf);
> >
> > #ifndef SIGNAL_CALL_BACK
> >      my_aiocb->aio_sigevent.sigev_notify = SIGEV_THREAD;
> >      my_aiocb->aio_sigevent.sigev_notify_function = finishRead;
> >      my_aiocb->aio_sigevent.sigev_notify_attributes = NULL;
> >      my_aiocb->aio_sigevent.sigev_value.sival_ptr = my_payload;
> > #endif
> >
> > #ifdef SIGNAL_CALL_BACK
> >     my_aiocb->aio_sigevent.sigev_notify = SIGEV_SIGNAL;
> >     my_aiocb->aio_sigevent.sigev_signo = SIG_AIO;
> >     my_aiocb->aio_sigevent.sigev_value.sival_ptr = my_payload;
> > #endif
> >      if(aio_read(my_aiocb) < 0){
> >          perror("aio_reading");
> >      }
> >  }
> >
> > /*
> >  * Class:     com_foo_io_AsyRandomAccessFile
> >  * Method:    openFd
> >  * Signature: (Ljava/lang/String;I)I
> >  */
> > JNIEXPORT jint JNICALL Java_com_foo_io_AsyRandomAccessFile_openFd
> >   (JNIEnv *env, jobject raf, jstring fileName, jint acc){
> >       int accFlags = O_RDONLY;
> >       switch(acc){
> >               case ASYRAF_READ:
> >                   accFlags = O_RDONLY;
> >               break;
> >               case ASYRAF_WRITE:
> >                   accFlags = O_WRONLY;
> >               break;
> >               case ASYRAF_RW:
> >                   accFlags = O_RDWR;
> >               break;
> >           }
> >       const jchar* charStr =
> (*env)->GetStringUTFChars(env,fileName,NULL);
> >       int fd = open(charStr,accFlags);
> >       (*env)->ReleaseStringChars(env,fileName,charStr);
> >       return fd;
> >   }
> > /*
> >  * Class:     com_foo_io_AsyRandomAccessFile
> >  * Method:    close
> >  * Signature: ()V
> >  */
> > JNIEXPORT void JNICALL Java_com_foo_io_AsyRandomAccessFile_close
> >   (JNIEnv *env, jobject raf){
> >      jint fd = (*env)->CallIntMethod(env,raf,getFdMethodId);
> >      close(fd);
> >  }
> >
> >  //Does not support the previous version
> > jint JNI_OnLoad(JavaVM *vm, void *reserved){
> >     JNIEnv *env ;
> >     jvm = vm ;
> >     int resId = (*vm)->AttachCurrentThread(vm,(void **)&env,NULL);
> >     if(resId < 0){
> >         fprintf(stderr,"Cannot attach to JVM thread\n");
> >         return JNI_VERSION_1_4;
> >     }
> >     jclass klass =
> > (*env)->FindClass(env,"com/telenav/io/AsyRandomAccessFile");
> >     asyRafClass = (*env)->NewGlobalRef(env,klass);
> >     getFdMethodId = (*env)->GetMethodID(env,asyRafClass,"getFd","()I");
> >     klass = (*env)->FindClass(env,"com/telenav/io/AsyReadRequest");
> >     asyReadRequestClass = (*env)->NewGlobalRef(env,klass);
> >     getReqFdMethodId =
> > (*env)->GetMethodID(env,asyReadRequestClass,"getFd","()I");
> >     getReqOffsetMethodId =
> > (*env)->GetMethodID(env,asyReadRequestClass,"getOffset","()I");
> >     getReqPositionMethodId =
> > (*env)->GetMethodID(env,asyReadRequestClass,"getPosition","()J");
> >     getReqBufMethodId =
> > (*env)->GetMethodID(env,asyReadRequestClass,"getBs","()[B");
> >     getReqLengthMethodId =
> > (*env)->GetMethodID(env,asyReadRequestClass,"getLength","()I");
> >     notifyClientMethodId =
> > (*env)->GetMethodID(env,asyReadRequestClass,"notifyClient","()V");
> >     (*jvm)->DetachCurrentThread(jvm);
> > #ifdef SIGNAL_CALL_BACK
> >     requests_list.head = NULL;
> >     requests_list.tail = NULL;
> >     requests_list.size = 0;
> >     pthread_cond_init(&waitCond,NULL);
> >     int ret = pthread_create(&threadId,NULL,startEntry,NULL);
> >     if(ret != 0){
> >         perror("Create the thread's error!");
> >     }
> >     sigemptyset(&sig_act.sa_mask);
> >     sig_act.sa_flags = SA_SIGINFO;
> >     sig_act.sa_sigaction = finishReadHandler;
> >     ret = sigaction(  SIG_AIO, &sig_act, NULL );
> >     if(ret != 0){
> >         perror("Hook signal error!");
> >     }
> >     pthread_mutex_init(&mutex,NULL);
> > #endif
> >     printf("ok for jni load\n");
> >     return JNI_VERSION_1_4;
> > }
> >
> > void JNI_OnUnload(JavaVM *vm, void *reserved){
> >     JNIEnv *env ;
> >     jvm = vm ;
> >     int resId = (*vm)->AttachCurrentThread(vm,(void **)&env,NULL);
> >     if(resId < 0){
> >         fprintf(stderr,"Cannot attach to JVM thread\n");
> >         return ;
> >     }
> > #ifdef SIGNAL_CALL_BACK
> >     //Terminate the thread,implement it later
> > #endif
> >     (*env)->DeleteGlobalRef(env,asyRafClass);
> >     (*jvm)->DetachCurrentThread(jvm);
> > }
> >
> > The performance is improved about 70%~100% compared with the thread
> > notification way.
> > But sadly, it's still slower than Sun's simple implementation. I've
> checked
> > the codes, it just only call read(2) for any read invokation. No special
> > operations.
> > Mentioned by the mail from Sun's employee in OpenJDK, the aio_read is
> > aio_read(3)
> > instead of aio_read(2). That means that's a lib call, not a syscall. The
> > syscall may benefit from the legacy interrupt to improve performance.  I
> > also noted that Linux has already put the aio_read implemented in the
> kernel
> > in the Kernel 2.5 or later. But it's still a software way to do that.
> > Indeed, the asynchronous read may improve the throughput, but it needs
> > current application to change their model to event-driven model. My idea
> > that is to replace the RandomAccessFile may not work. Because our
> > application is just work as a block reading way.
> > I don't know if there is any successful large scale system that is using
> the
> > event-driven model especially in large file system's management.(Google
> I
> > guess?)
> > I have no idea that if the experiment should continue. Any of your
> comments
> > are welcome.
> > Thanks.
> >
> >
> > On 1/16/07, yueyu lin <po...@gmail.com> wrote:
> > >
> > > I feel uncomfortable to send mails in the maillist since I didn't ever
> use
> > > Mina. But I think this issue can be a pure technical problem to
> discuss that
> > > may help us to achieve a better performance.
> > >
> > > First, I'll describe how I tested the performance.
> > >
> > >
> > > Test data: File A larger than 1.2 Giga bytes. (Text file)
> > > Test cases: File B to describe the test cases. (Text file)
> > >          The format looks like:
> > >               offset
> > >               (1024 bytes)
> > >               ...
> > >          This test cases file is generated by a program. It will
> randomly
> > > find an offset in File A and record the offset to File B, retrieve
> 1024
> > > bytes from the offset and write these bytes to the File B. That will
> help to
> > > verify the multiple threads program can get the correct results.
> > >
> > > I have a DataCenter class is to read the test cases from File B and
> give
> > > them to the caller.
> > > public class DataCeneter {
> > >
> > >     RandomAccessFile  raf = null;
> > >
> > >     public DataCeneter(String fileName){
> > >         try {
> > >             raf = new RandomAccessFile(fileName,"r");
> > >         } catch (FileNotFoundException e) {
> > >             e.printStackTrace();
> > >         }
> > >     }
> > >
> > >     public synchronized String[] readLines(){
> > >         if(raf == null){
> > >             return null;
> > >         }
> > >         try {
> > >             String[] ret = new String[2];
> > >             ret[0] = raf.readLine();
> > >             byte[] bs = new byte[1024];
> > >             raf.readFully(bs);
> > >             raf.read();
> > >             ret[1] = new String(bs);
> > >             return ret;
> > >         } catch (IOException e) {
> > >             return null;
> > >         }
> > >     }
> > >
> > > }
> > >
> > >
> > > The test program willonly ask the threads to call the readLines
> function
> > > to get the test data. The String[0] is the offset and String[1] is the
> > > value. An abstract test Class make the test job easier and fare to
> very
> > > testers. Different file reader implementation only needs to implement
> read(int
> > > position) and init(String fileName) function. Other jobs like
> comparing
> > > results will be the same in every kinds of implementation.
> > >
> > > public abstract class FileReaderTest {
> > >
> > >     int threadNum = 3;
> > >     DataCeneter dc = null;
> > >     int runningThreads ;
> > >     Object mutex = new Object();
> > >     int amount = 0;
> > >
> > >     public FileReaderTest(int threadNum,String fileName,String
> > > srcFileName){
> > >         this.threadNum = threadNum;
> > >         dc = new DataCeneter(fileName);
> > >         runningThreads = threadNum;
> > >         init(srcFileName);
> > >     }
> > >
> > >     public abstract void init(String srcFileName) ;
> > >
> > >     public abstract String read(int position);
> > >
> > >     public void runTest(){
> > >         for(int i = 0;i < threadNum;i ++){
> > >             new Thread(new Runnable() {
> > >                 public void run() {
> > >                     String[] lines = null;
> > >                     while( (lines = dc.readLines()) != null ){
> > >                         String result = read(Integer.parseInt
> (lines[0]));
> > >                         if(!lines[1].equals(result)){
> > >                             System.err.println("Wrong output~");
> > >                         }
> > >                         amount ++;
> > >                     }
> > >                     synchronized (mutex) {
> > >                         runningThreads --;
> > >                         mutex.notifyAll();
> > >                     }
> > >                 }
> > >             }).start();
> > >         }
> > >
> > >         while(runningThreads > 0){
> > >             synchronized (mutex) {
> > >                 try {
> > >                     mutex.wait();
> > >                 } catch (InterruptedException e) {
> > >                     // TODO Auto-generated catch block
> > >                     e.printStackTrace();
> > >                 }
> > >                 if(runningThreads <= 0)
> > >                     break;
> > >             }
> > >         }
> > >
> > >         System.out.println("Finished:"+amount);
> > >     }
> > > }
> > >
> > >
> > > Then let's see the RandomAccessFileTest implementation.
> > >
> > > public class RafReaderTest extends FileReaderTest {
> > >
> > >     RandomAccessFile raf ;
> > >
> > >     public RafReaderTest(int threadNum, String fileName, String
> > > srcFileName) {
> > >         super(threadNum, fileName, srcFileName);
> > >
> > >     }
> > >
> > >
> > >
> > >     public void init(String srcFileName) {
> > >         try {
> > >             raf = new RandomAccessFile(srcFileName,"r");
> > >         } catch (FileNotFoundException e) {
> > >             e.printStackTrace();
> > >             System.exit(-1);
> > >         }
> > >     }
> > >
> > >     public String read(int position) {
> > >         byte[] bs = new byte[1024];
> > >         int read = 0;
> > >         int offset = 0;
> > >         int length = 1024;
> > >         synchronized(raf){
> > >             try {
> > >                 raf.seek(position);
> > >                 while(read != 1024){
> > >                         int r = raf.read(bs, offset, length);
> > >                         offset += r;
> > >                         length -= r;
> > >                         read += r;
> > >                 }
> > >             }catch (IOException e) {
> > >                 e.printStackTrace();
> > >             }
> > >         }
> > >         return new String(bs);
> > >     }
> > >
> > >     public static void main(String[] args){
> > >         RafReaderTest rrt = new
> > >
> RafReaderTest(5,"/home/yueyulin/access.txt","/home/yueyulin/hugeFile.txt");
> > >         long start = System.currentTimeMillis();
> > >         rrt.runTest();
> > >         long end = System.currentTimeMillis ();
> > >         System.out.println("Raf read time is "+(end-start));
> > >     }
> > >
> > > }
> > >
> > > If only open one file descriptor, everytime we have to lock the codes
> to
> > > prevent from changing the offset.
> > > I have implemented another class to use Linux aio_read to read data.
> The
> > > file reader test class is
> > > public class AsyReaderTest extends FileReaderTest {
> > >
> > >     private AsyRandomAccessFile araf ;
> > >
> > >     public AsyReaderTest(int threadNum, String fileName, String
> > > srcFileName) {
> > >         super(threadNum, fileName, srcFileName);
> > >     }
> > >
> > >     @Override
> > >     public void init(String srcFileName) {
> > >         // TODO Auto-generated method stub
> > >         araf = new AsyRandomAccessFile(srcFileName,"r");
> > >     }
> > >
> > >     @Override
> > >     public String read(int position) {
> > >         byte[] bs = new byte[1024];
> > >         int offset = 0;
> > >         int length = 1024;
> > >         araf.read(bs, offset, position, length);
> > >         return new String(bs);
> > >     }
> > >
> > >     public static void main(String[] args){
> > >         AsyReaderTest rrt = new
> > >
> AsyReaderTest(5,"/home/yueyulin/access.txt","/home/yueyulin/hugeFile.txt");
> > >         long start = System.currentTimeMillis();
> > >         rrt.runTest();
> > >         long end = System.currentTimeMillis ();
> > >         System.out.println("Araf read time is "+(end-start));
> > >     }
> > >
> > > }
> > >
> > >
> > > I generated a file named access.txt to contain the test cases. It
> contains
> > > 20000 cases about 20 M or so.
> > >
> > > The results are different in different situation.
> > >
> > >
> > >    1. When I run these two test programs first time. The Asynchronous
> > >    version is much better than the RandomAccessFile version. The
> detailed
> > >    results is :
> > >
> > >
> > >    - RandomAccessFileTest: 20000 cases, time: 120 seconds
> > >    - AsyRandomAccessFileTest: 20000 cases, time: 70 seconds
> > >
> > >         2 . You will find the speed is slow. That is explainable
> because
> > > the system starts up and everything is not in cache. Linux will cache
> the
> > > most useful data into
> > >              memory. So the next time when I run the tests, the
> > > performance is much better.
> > >
> > >    - RandomAccessFileTest: 20000 cases, time: 1~1.9 seconds
> > >    - AsyRandomAccessFileTest: 20000 cases, time 4~4.5 seconds.
> > >
> > >         Although performance is much better than the initial running,
> the
> > > results are not what we expected. The asynchronous version is even
> slower
> > > than the synchronous version in multiple threads environment. (I will
> send
> > > out the detailed implementation of JNI in the later discussion).
> > >         I was also impressed by the RandomAccessFile's good
> performance.
> > > So I went into the Java's sources to find why.
> > >         I have a copy of Java's source codes. I don't want to describe
> how
> > > to find the place in bunch of codes. I just pick up the related source
> codes
> > > out.
> > >         In the $JAVA_SOURCE/j2se/src/share/native/java/io directory,
> there
> > > is a file named "io_util.c" describes how the read does.
> > > /* The maximum size of a stack-allocated buffer.
> > >  */
> > > #define BUF_SIZE 8192
> > >
> > >
> > > int
> > > readBytes(JNIEnv *env, jobject this, jbyteArray bytes,
> > >       jint off, jint len, jfieldID fid)
> > > {
> > >     int nread, datalen;
> > >     char stackBuf[BUF_SIZE];
> > >     char *buf = 0;
> > >     FD fd;
> > >
> > >     if (IS_NULL(bytes)) {
> > >     JNU_ThrowNullPointerException(env, 0);
> > >     return -1;
> > >     }
> > >     datalen = (*env)->GetArrayLength(env, bytes);
> > >
> > >     if ((off < 0) || (off > datalen) ||
> > >         (len < 0) || ((off + len) > datalen) || ((off + len) < 0)) {
> > >         JNU_ThrowByName(env, "java/lang/IndexOutOfBoundsException",
> 0);
> > >     return -1;
> > >     }
> > >
> > >     if (len == 0) {
> > >     return 0;
> > >     } else if (len > BUF_SIZE) {
> > >         buf = malloc(len);
> > >     if (buf == 0) {
> > >         JNU_ThrowOutOfMemoryError(env, 0);
> > >         return 0;
> > >         }
> > >     } else {
> > >         buf = stackBuf;
> > >     }
> > >
> > >     fd = GET_FD(this, fid);
> > >     nread = IO_Read(fd, buf, len);
> > >     if (nread > 0) {
> > >         (*env)->SetByteArrayRegion(env, bytes, off, nread, (jbyte
> *)buf);
> > >     } else if (nread == JVM_IO_ERR) {
> > >     JNU_ThrowIOExceptionWithLastError(env, "Read error");
> > >     } else if (nread == JVM_IO_INTR) { /* EOF */
> > >         JNU_ThrowByName(env, "java/io/InterruptedIOException", 0);
> > >     } else { /* EOF */
> > >     nread = -1;
> > >     }
> > >
> > >     if (buf != stackBuf) {
> > >         free(buf);
> > >     }
> > >     return nread;
> > > }
> > >
> > > The codes above don't contain a lot of tricky codes. The only
> well-known
> > > technical is to use a stack allocation( char stackBuf[BUF_SIZE];) for
> > > small memory allocation instead of asking for memory allocation from
> heap(char
> > > *buf = 0;) every time.  But for aio_read, we have no way to use the
> stack
> > > memory because a call back function must be used. And I even don't
> think the
> > > small codes are the key point.
> > > Then I found Sun's implementation using IO_Read(fd, buf, len) to do
> the
> > > real read operation. This macro is defined in
> > > $JAVA_SOURCE/j2se/src/solaris/native/java/io/io_util_md.h.
> > > #define IO_Read JVM_Read
> > >
> > > But when I want to look into the JVM_Read codes, I found nothing. I
> only
> > > found its declaration in jvm.h. In  VM.c, I still found nothing
> valuable.
> > > I guess there are should some key codes in the closed(or missing?)
> codes.
> > >
> > > My next plan is to subscribe the openJDK's mail list to ask for the
> codes
> > > of JVM_Read implementation, at lease implementation in POSIX system.
> > >
> > > Thanks for reading the long and boring mail.  In fact, I think when we
> > > limit Java's cross-platform ability, Java can do a lot of things
> elegantly.
> > > But Sun's engineers have done a lot more than what I thought. I wish
> we
> > > would do something to bring the aio really to Java.
> > >
> > > On 1/16/07, Mike Heath <mh...@apache.org> wrote:
> > > >
> > > > The Asynchronous File I/O code is still in an experimental
> state.  There
> > > > are some known performance issues and I'm working on ways to
> overcome
> > > > them.  I am actively working on the library and I appreciate your
> > > > feedback.
> > > >
> > > > Depending on your situation, you may be able to do all the file
> > > > processing in a separate thread using Java 5 concurrent library to
> get
> > > > asynchronous I/O without the pains of using JNI.
> > > >
> > > > Within the next few weeks, I plan to release some code that does
> > > > asynchronous file I/O through a separate thread.  This will work on
> all
> > > > platforms and not just Linux.  It will also give me a baseline that
> can
> > > > be used for performance testing the use of POSIX AIO calls.  Within
> the
> > > > next few months I plan to do a lot of performance testing and
> tweaking.
> > > > Any help with this would be appreciated.
> > > >
> > > > If you have any additional questions, concerns or other feedback,
> please
> > > > let me know.
> > > >
> > > > -Mike
> > > >
> > > > On Mon, 2007-01-15 at 18:14 +0800, yueyu lin wrote:
> > > > > Hi, everyone,
> > > > >    I found in the mina contributors, there is an
> asynchronousFileIO
> > > > that is
> > > > > using aio_read in linux.
> > > > >    The URL is
> http://mina.apache.org/asynchronous-file-io-in-java.html
> > > > >
> > > > >   It happens that I decide to try to replace the RandomAccessFile
> in
> > > > my
> > > > > project. Our system is a large and busy system based on huge files
> in
> > > > Linux.
> > > > > When profiling, I found that a lot of blocking/waiting in file IO
> on
> > > > the
> > > > > same file descriptor. When there is some operation asking for a
> lot of
> > > > IO
> > > > > requests(it's quiet often in our system), other threads will be
> > > > affected.
> > > > >   So I just finished an experimental modification that invokes
> > > > aio_read in
> > > > > Linux in JNI. Since our system is running only in Linux, so it
> will be
> > > > good
> > > > > for us.
> > > > >   But when I tested the performance, the results depressed me. The
> > > > > performance is three times lower than the RandomAccessFile in 5
> > > > threads.
> > > > >
> > > > >   I search the internet to ses if there's any other one to try
> that
> > > > like
> > > > > what I did. Then I found it in the mina contributors.   The codes
> are
> > > > almost
> > > > > the same with what I did. The difference is just that I'm using C
> and
> > > > the
> > > > > contributors' codes are using c++.
> > > > >   I want to know if you tested the performance and compared with
> the
> > > > sun's
> > > > > RandomAccessFile? What's the result?
> > > > >   I also looked into sun's native method opened some time ago. The
> > > > > j2se/share/native/java/io/io_util.c has the detailed codes. It
> doesn't
> > > > have
> > > > > any tricky codes. The only one is that it's using a stack variable
> > > > byte
> > > > > array if the read length is less than 8K.  But in the aio_read, it
> > > > doesn't
> > > > > work. Because we have to allocate new  memory to contain the
> results
> > > > or
> > > > > using the byte array passed into the native codes. It has to be in
> the
> > > >
> > > > > heap.  Even so, I don't think it will affect so much .
> > > > >   Thanks in advance.
> > > > >   I also want to know how to contact Mike
> > > > > Heath<
> http://cwiki.apache.org/confluence/users/viewuserprofile.action?username=mheath
> > > > >.
> > > > > He contributes the codes.
> > > > >
> > > > >   BTW: I'm even using the same development environment with Mike
> > > > Heath.
> > > > > (Ubuntu 6.10).
> > > >
> > > >
> > >
> > >
> > > --
> > > --
> > > Yueyu Lin
> >
> >
> >
> >
> > --
> > --
> > Yueyu Lin
> >
> >
>



-- 
--
Yueyu Lin