You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@mina.apache.org by Mike Heath <mh...@apache.org> on 2007/02/02 22:49:44 UTC
Re: Is there someone trying the AsynchronousFileIO?
I've been looking over your AIO stuff in more detail and I think part
of your performance problem might be that you're not opening the file
with O_DIRECT flag set. I would be very interested to see the
performance difference setting this flag might make.
-Mike
On 1/17/07, yueyu lin <po...@gmail.com> wrote:
> OK, I finally implement a stable version based on the signal call in
> aio_read .
> Here is the source codes:
> The header:
> /* DO NOT EDIT THIS FILE - it is machine generated */
> #include <jni.h>
> /* Header for class com_foo_io_AsyRandomAccessFile */
>
> #ifndef _Included_com_foo_io_AsyRandomAccessFile
> #define _Included_com_foo_io_AsyRandomAccessFile
> #ifdef __cplusplus
> extern "C" {
> #endif
> /*
> * Class: com_foo_io_AsyRandomAccessFile
> * Method: read
> * Signature: (IIJI[BLcom/foo/io/AsyReadRequest;)V
> */
> JNIEXPORT void JNICALL Java_com_foo_io_AsyRandomAccessFile_read
> (JNIEnv *, jclass, jint, jint, jlong, jint, jbyteArray, jobject);
>
> /*
> * Class: com_foo_io_AsyRandomAccessFile
> * Method: openFd
> * Signature: (Ljava/lang/String;I)I
> */
> JNIEXPORT jint JNICALL Java_com_foo_io_AsyRandomAccessFile_openFd
> (JNIEnv *, jclass, jstring, jint);
>
> /*
> * Class: com_foo_io_AsyRandomAccessFile
> * Method: close
> * Signature: ()V
> */
> JNIEXPORT void JNICALL Java_com_foo_io_AsyRandomAccessFile_close
> (JNIEnv *, jobject);
>
> #ifdef __cplusplus
> }
> #endif
> #endif
>
> The C source code:
> #include <fcntl.h>
> #include <unistd.h>
> #include <aio.h>
> #include <stdlib.h>
> #include <errno.h>
> #include <sys/stat.h>
> #include <pthread.h>
> #include "com_foo_io_AsyRandomAccessFile.h"
> #define ASYRAF_READ 0
> #define ASYRAF_WRITE 1
> #define ASYRAF_RW 2
> #define SIGNAL_CALL_BACK
> #define MAX_BUF 8092
> #define NO_LIST_DEBUG
> //The signal number to use.
> #define SIG_AIO SIGRTMIN+5
> struct _payload {
> jobject req;
> jbyteArray buf ;
> jint offset;
> struct aiocb *my_aiocb;
> };
>
> typedef struct _payload payload;
> payload empty_payload;
>
> struct _list_point{
> payload *value;
> struct _list_point *prev;
> struct _list_point *next;
> };
> typedef struct _list_point list_point;
> struct _payload_list{
> int size ;
> list_point *head;
> list_point *tail;
> };
> typedef struct _payload_list payload_list;
>
>
> static JavaVM *jvm;
> static jmethodID getFdMethodId ;
> static jmethodID notifyClientMethodId ;
> static jclass asyRafClass ;
>
> static jmethodID getReqFdMethodId ;
> static jmethodID getReqOffsetMethodId ;
> static jmethodID getReqPositionMethodId ;
> static jmethodID getReqLengthMethodId ;
> static jmethodID getReqBufMethodId ;
> static jclass asyReadRequestClass ;
>
> //static GAsyncQueue *taskQueue ;
>
> static int threadId;
> static struct sigaction sig_act;
> static pthread_mutex_t mutex ;
> static payload_list requests_list;
> static pthread_cond_t waitCond ;
>
>
> void add_payload(payload_list *my_list,payload *my_payload){
> if(my_payload == NULL){
> return ;
> }
> list_point *p = (list_point *)malloc(sizeof(list_point));
> p->value = my_payload;
> p->next = NULL;
> pthread_mutex_lock(&mutex);
> my_list->size ++;
> list_point *tail = my_list->tail;
> if(tail == NULL){
> list_point *head = my_list->head;
> if(head == NULL){
> my_list->head = p;
> }else{
> head->next = p;
> my_list->tail = p;
> }
> }else{
> tail->next = p;
> p->prev = tail;
> my_list->tail = p;
> }
> pthread_mutex_unlock(&mutex);
> return ;
> }
>
> payload* get_first_payload(payload_list *my_list){
> if(my_list == NULL){
> return NULL;
> }
> payload *ret ;
> pthread_mutex_lock(&mutex);
> list_point *head = my_list->head;
> list_point *tail = my_list->tail;
> if(head == NULL){
> ret = NULL;
> }else{
> ret = head->value;
> list_point *second = head->next;
> my_list->head = second;
> if(tail == second){
> my_list->tail = NULL;
> }
> free(head);
> }
> if(ret != NULL){
> my_list->size --;
> }
> pthread_mutex_unlock(&mutex);
> return ret;
> }
>
> void handle_payload(JNIEnv *env,payload *my_payload){
> struct aiocb *my_aiocb = my_payload->my_aiocb;
> if(aio_error(my_aiocb) == 0){
> //notify the data comes
> aio_return(my_aiocb);
> jbyteArray buf = my_payload->buf;
> jbyte* nativeBytes = my_aiocb->aio_buf;
> jobject req = my_payload->req;
> (*env)->SetByteArrayRegion(env, buf, my_payload->offset,
> my_aiocb->aio_nbytes, (jbyte *)nativeBytes);
> (*env)->CallVoidMethod(env,req,notifyClientMethodId,req);
> (*env)->DeleteGlobalRef(env,req);
> free(nativeBytes);
> free(my_aiocb);
> free(my_payload);
> }else{
> //Also should notify error
> perror("no data!");
> }
> }
>
> //This method is used by a thread to send the notification to JVM's thread
> void* startEntry(void* data){
> JNIEnv *env;
> int resId = (*jvm)->AttachCurrentThread(jvm,(void **)&env,NULL);
> if(resId < 0){
> fprintf(stderr,"The native thread cannot attach to JVM thread\n");
> return NULL;
> }
> while(1){
> payload *my_payload = NULL;
> while( (my_payload = get_first_payload(&requests_list)) != NULL){
> handle_payload(env,my_payload);
> }
> if(requests_list.size > 0){
> continue ;
> }
> pthread_mutex_lock(&mutex);
> if(requests_list.size <= 0){
> pthread_cond_wait(&waitCond,&mutex);
> }
> pthread_mutex_unlock(&mutex);
> // payload *my_payload = (payload*)(g_async_queue_pop(taskQueue));
> // //printf("Receive %ld\n",recCnt);
> // handle_payload(env,my_payload);
> }
> (*jvm)->DetachCurrentThread(jvm);
> return NULL;
> }
>
> //This is the signal handler
> void finishReadHandler( int signo, siginfo_t *info, void *context ){
> /* Ensure it's our signal */
> if (info->si_signo == SIG_AIO) {
> //g_async_queue_push(taskQueue,info->si_value.sival_ptr);
> add_payload(&requests_list,info->si_value.sival_ptr);
> pthread_mutex_lock(&mutex);
> pthread_cond_broadcast(&waitCond);
> pthread_mutex_unlock(&mutex);
>
> /*static int cnt = 0;
> cnt ++;
> printf("receive signal %ld\n",cnt);*/
> }
> }
>
> //This is the thread call back
> void finishRead(sigval_t sig){
> payload *my_payload = (payload*)sig.sival_ptr;
> struct aiocb *my_aiocb = my_payload->my_aiocb;
> /* if(aio_error(my_aiocb) != 0){
> perror("finish reading err");
> return ;
> }*/
> JNIEnv *env;
> int resId = (*jvm)->AttachCurrentThread(jvm,(void **)&env,NULL);
> if(resId < 0){
> fprintf(stderr,"Cannot attach to JVM thread\n");
> return ;
> }
>
> jobject req = my_payload->req;
> jbyteArray buf = my_payload->buf;
> jbyte* nativeBytes = my_aiocb->aio_buf;
> (*env)->SetByteArrayRegion(env, buf, my_payload->offset,
> my_aiocb->aio_nbytes, (jbyte *)nativeBytes);
> (*env)->CallVoidMethod(env,req,notifyClientMethodId,req);
> (*env)->DeleteGlobalRef(env,req);
> (*jvm)->DetachCurrentThread(jvm);
>
> free(nativeBytes);
> free(my_aiocb);
> free(my_payload);
> }
>
> void printMyaiocb(struct aiocb *myaiocb){
> printf("fd is %d;offset is %d;length is
> %d\n",myaiocb->aio_fildes,myaiocb->aio_offset,myaiocb->aio_nbytes);
> }
> /*
> * Class: com_foo_io_AsyRandomAccessFile
> * Method: read
> * Signature: (IIJILjava/nio/ByteBuffer;Lcom/telenav/io/AsyReadRequest;)V
> */
> JNIEXPORT void JNICALL Java_com_foo_io_AsyRandomAccessFile_read
> (JNIEnv *env, jclass klass, jint fd, jint offset, jlong position, jint
> length, jbyteArray byteBuf, jobject req){
> struct aiocb *my_aiocb = (struct aiocb*)malloc(sizeof(struct aiocb));
> payload *my_payload = (payload*)malloc(sizeof(payload));
>
> bzero(my_aiocb,sizeof(struct aiocb));
> bzero(my_payload,sizeof(payload));
>
> my_aiocb->aio_fildes = fd;
> my_aiocb->aio_offset = position;
> my_aiocb->aio_nbytes = length;
> my_aiocb->aio_buf = malloc(sizeof(char)*(length+1));
> my_payload->offset = offset;
> my_payload->my_aiocb = my_aiocb ;
> my_payload->req = (*env)->NewGlobalRef(env,req);
> my_payload->buf = (*env)->NewGlobalRef(env,byteBuf);
>
> #ifndef SIGNAL_CALL_BACK
> my_aiocb->aio_sigevent.sigev_notify = SIGEV_THREAD;
> my_aiocb->aio_sigevent.sigev_notify_function = finishRead;
> my_aiocb->aio_sigevent.sigev_notify_attributes = NULL;
> my_aiocb->aio_sigevent.sigev_value.sival_ptr = my_payload;
> #endif
>
> #ifdef SIGNAL_CALL_BACK
> my_aiocb->aio_sigevent.sigev_notify = SIGEV_SIGNAL;
> my_aiocb->aio_sigevent.sigev_signo = SIG_AIO;
> my_aiocb->aio_sigevent.sigev_value.sival_ptr = my_payload;
> #endif
> if(aio_read(my_aiocb) < 0){
> perror("aio_reading");
> }
> }
>
> /*
> * Class: com_foo_io_AsyRandomAccessFile
> * Method: openFd
> * Signature: (Ljava/lang/String;I)I
> */
> JNIEXPORT jint JNICALL Java_com_foo_io_AsyRandomAccessFile_openFd
> (JNIEnv *env, jobject raf, jstring fileName, jint acc){
> int accFlags = O_RDONLY;
> switch(acc){
> case ASYRAF_READ:
> accFlags = O_RDONLY;
> break;
> case ASYRAF_WRITE:
> accFlags = O_WRONLY;
> break;
> case ASYRAF_RW:
> accFlags = O_RDWR;
> break;
> }
> const jchar* charStr = (*env)->GetStringUTFChars(env,fileName,NULL);
> int fd = open(charStr,accFlags);
> (*env)->ReleaseStringChars(env,fileName,charStr);
> return fd;
> }
> /*
> * Class: com_foo_io_AsyRandomAccessFile
> * Method: close
> * Signature: ()V
> */
> JNIEXPORT void JNICALL Java_com_foo_io_AsyRandomAccessFile_close
> (JNIEnv *env, jobject raf){
> jint fd = (*env)->CallIntMethod(env,raf,getFdMethodId);
> close(fd);
> }
>
> //Does not support the previous version
> jint JNI_OnLoad(JavaVM *vm, void *reserved){
> JNIEnv *env ;
> jvm = vm ;
> int resId = (*vm)->AttachCurrentThread(vm,(void **)&env,NULL);
> if(resId < 0){
> fprintf(stderr,"Cannot attach to JVM thread\n");
> return JNI_VERSION_1_4;
> }
> jclass klass =
> (*env)->FindClass(env,"com/telenav/io/AsyRandomAccessFile");
> asyRafClass = (*env)->NewGlobalRef(env,klass);
> getFdMethodId = (*env)->GetMethodID(env,asyRafClass,"getFd","()I");
> klass = (*env)->FindClass(env,"com/telenav/io/AsyReadRequest");
> asyReadRequestClass = (*env)->NewGlobalRef(env,klass);
> getReqFdMethodId =
> (*env)->GetMethodID(env,asyReadRequestClass,"getFd","()I");
> getReqOffsetMethodId =
> (*env)->GetMethodID(env,asyReadRequestClass,"getOffset","()I");
> getReqPositionMethodId =
> (*env)->GetMethodID(env,asyReadRequestClass,"getPosition","()J");
> getReqBufMethodId =
> (*env)->GetMethodID(env,asyReadRequestClass,"getBs","()[B");
> getReqLengthMethodId =
> (*env)->GetMethodID(env,asyReadRequestClass,"getLength","()I");
> notifyClientMethodId =
> (*env)->GetMethodID(env,asyReadRequestClass,"notifyClient","()V");
> (*jvm)->DetachCurrentThread(jvm);
> #ifdef SIGNAL_CALL_BACK
> requests_list.head = NULL;
> requests_list.tail = NULL;
> requests_list.size = 0;
> pthread_cond_init(&waitCond,NULL);
> int ret = pthread_create(&threadId,NULL,startEntry,NULL);
> if(ret != 0){
> perror("Create the thread's error!");
> }
> sigemptyset(&sig_act.sa_mask);
> sig_act.sa_flags = SA_SIGINFO;
> sig_act.sa_sigaction = finishReadHandler;
> ret = sigaction( SIG_AIO, &sig_act, NULL );
> if(ret != 0){
> perror("Hook signal error!");
> }
> pthread_mutex_init(&mutex,NULL);
> #endif
> printf("ok for jni load\n");
> return JNI_VERSION_1_4;
> }
>
> void JNI_OnUnload(JavaVM *vm, void *reserved){
> JNIEnv *env ;
> jvm = vm ;
> int resId = (*vm)->AttachCurrentThread(vm,(void **)&env,NULL);
> if(resId < 0){
> fprintf(stderr,"Cannot attach to JVM thread\n");
> return ;
> }
> #ifdef SIGNAL_CALL_BACK
> //Terminate the thread,implement it later
> #endif
> (*env)->DeleteGlobalRef(env,asyRafClass);
> (*jvm)->DetachCurrentThread(jvm);
> }
>
> The performance is improved about 70%~100% compared with the thread
> notification way.
> But sadly, it's still slower than Sun's simple implementation. I've checked
> the codes, it just only call read(2) for any read invokation. No special
> operations.
> Mentioned by the mail from Sun's employee in OpenJDK, the aio_read is
> aio_read(3)
> instead of aio_read(2). That means that's a lib call, not a syscall. The
> syscall may benefit from the legacy interrupt to improve performance. I
> also noted that Linux has already put the aio_read implemented in the kernel
> in the Kernel 2.5 or later. But it's still a software way to do that.
> Indeed, the asynchronous read may improve the throughput, but it needs
> current application to change their model to event-driven model. My idea
> that is to replace the RandomAccessFile may not work. Because our
> application is just work as a block reading way.
> I don't know if there is any successful large scale system that is using the
> event-driven model especially in large file system's management.(Google I
> guess?)
> I have no idea that if the experiment should continue. Any of your comments
> are welcome.
> Thanks.
>
>
> On 1/16/07, yueyu lin <po...@gmail.com> wrote:
> >
> > I feel uncomfortable to send mails in the maillist since I didn't ever use
> > Mina. But I think this issue can be a pure technical problem to discuss that
> > may help us to achieve a better performance.
> >
> > First, I'll describe how I tested the performance.
> >
> >
> > Test data: File A larger than 1.2 Giga bytes. (Text file)
> > Test cases: File B to describe the test cases. (Text file)
> > The format looks like:
> > offset
> > (1024 bytes)
> > ...
> > This test cases file is generated by a program. It will randomly
> > find an offset in File A and record the offset to File B, retrieve 1024
> > bytes from the offset and write these bytes to the File B. That will help to
> > verify the multiple threads program can get the correct results.
> >
> > I have a DataCenter class is to read the test cases from File B and give
> > them to the caller.
> > public class DataCeneter {
> >
> > RandomAccessFile raf = null;
> >
> > public DataCeneter(String fileName){
> > try {
> > raf = new RandomAccessFile(fileName,"r");
> > } catch (FileNotFoundException e) {
> > e.printStackTrace();
> > }
> > }
> >
> > public synchronized String[] readLines(){
> > if(raf == null){
> > return null;
> > }
> > try {
> > String[] ret = new String[2];
> > ret[0] = raf.readLine();
> > byte[] bs = new byte[1024];
> > raf.readFully(bs);
> > raf.read();
> > ret[1] = new String(bs);
> > return ret;
> > } catch (IOException e) {
> > return null;
> > }
> > }
> >
> > }
> >
> >
> > The test program willonly ask the threads to call the readLines function
> > to get the test data. The String[0] is the offset and String[1] is the
> > value. An abstract test Class make the test job easier and fare to very
> > testers. Different file reader implementation only needs to implement read(int
> > position) and init(String fileName) function. Other jobs like comparing
> > results will be the same in every kinds of implementation.
> >
> > public abstract class FileReaderTest {
> >
> > int threadNum = 3;
> > DataCeneter dc = null;
> > int runningThreads ;
> > Object mutex = new Object();
> > int amount = 0;
> >
> > public FileReaderTest(int threadNum,String fileName,String
> > srcFileName){
> > this.threadNum = threadNum;
> > dc = new DataCeneter(fileName);
> > runningThreads = threadNum;
> > init(srcFileName);
> > }
> >
> > public abstract void init(String srcFileName) ;
> >
> > public abstract String read(int position);
> >
> > public void runTest(){
> > for(int i = 0;i < threadNum;i ++){
> > new Thread(new Runnable() {
> > public void run() {
> > String[] lines = null;
> > while( (lines = dc.readLines()) != null ){
> > String result = read(Integer.parseInt(lines[0]));
> > if(!lines[1].equals(result)){
> > System.err.println("Wrong output~");
> > }
> > amount ++;
> > }
> > synchronized (mutex) {
> > runningThreads --;
> > mutex.notifyAll();
> > }
> > }
> > }).start();
> > }
> >
> > while(runningThreads > 0){
> > synchronized (mutex) {
> > try {
> > mutex.wait();
> > } catch (InterruptedException e) {
> > // TODO Auto-generated catch block
> > e.printStackTrace();
> > }
> > if(runningThreads <= 0)
> > break;
> > }
> > }
> >
> > System.out.println("Finished:"+amount);
> > }
> > }
> >
> >
> > Then let's see the RandomAccessFileTest implementation.
> >
> > public class RafReaderTest extends FileReaderTest {
> >
> > RandomAccessFile raf ;
> >
> > public RafReaderTest(int threadNum, String fileName, String
> > srcFileName) {
> > super(threadNum, fileName, srcFileName);
> >
> > }
> >
> >
> >
> > public void init(String srcFileName) {
> > try {
> > raf = new RandomAccessFile(srcFileName,"r");
> > } catch (FileNotFoundException e) {
> > e.printStackTrace();
> > System.exit(-1);
> > }
> > }
> >
> > public String read(int position) {
> > byte[] bs = new byte[1024];
> > int read = 0;
> > int offset = 0;
> > int length = 1024;
> > synchronized(raf){
> > try {
> > raf.seek(position);
> > while(read != 1024){
> > int r = raf.read(bs, offset, length);
> > offset += r;
> > length -= r;
> > read += r;
> > }
> > }catch (IOException e) {
> > e.printStackTrace();
> > }
> > }
> > return new String(bs);
> > }
> >
> > public static void main(String[] args){
> > RafReaderTest rrt = new
> > RafReaderTest(5,"/home/yueyulin/access.txt","/home/yueyulin/hugeFile.txt");
> > long start = System.currentTimeMillis();
> > rrt.runTest();
> > long end = System.currentTimeMillis ();
> > System.out.println("Raf read time is "+(end-start));
> > }
> >
> > }
> >
> > If only open one file descriptor, everytime we have to lock the codes to
> > prevent from changing the offset.
> > I have implemented another class to use Linux aio_read to read data. The
> > file reader test class is
> > public class AsyReaderTest extends FileReaderTest {
> >
> > private AsyRandomAccessFile araf ;
> >
> > public AsyReaderTest(int threadNum, String fileName, String
> > srcFileName) {
> > super(threadNum, fileName, srcFileName);
> > }
> >
> > @Override
> > public void init(String srcFileName) {
> > // TODO Auto-generated method stub
> > araf = new AsyRandomAccessFile(srcFileName,"r");
> > }
> >
> > @Override
> > public String read(int position) {
> > byte[] bs = new byte[1024];
> > int offset = 0;
> > int length = 1024;
> > araf.read(bs, offset, position, length);
> > return new String(bs);
> > }
> >
> > public static void main(String[] args){
> > AsyReaderTest rrt = new
> > AsyReaderTest(5,"/home/yueyulin/access.txt","/home/yueyulin/hugeFile.txt");
> > long start = System.currentTimeMillis();
> > rrt.runTest();
> > long end = System.currentTimeMillis ();
> > System.out.println("Araf read time is "+(end-start));
> > }
> >
> > }
> >
> >
> > I generated a file named access.txt to contain the test cases. It contains
> > 20000 cases about 20 M or so.
> >
> > The results are different in different situation.
> >
> >
> > 1. When I run these two test programs first time. The Asynchronous
> > version is much better than the RandomAccessFile version. The detailed
> > results is :
> >
> >
> > - RandomAccessFileTest: 20000 cases, time: 120 seconds
> > - AsyRandomAccessFileTest: 20000 cases, time: 70 seconds
> >
> > 2 . You will find the speed is slow. That is explainable because
> > the system starts up and everything is not in cache. Linux will cache the
> > most useful data into
> > memory. So the next time when I run the tests, the
> > performance is much better.
> >
> > - RandomAccessFileTest: 20000 cases, time: 1~1.9 seconds
> > - AsyRandomAccessFileTest: 20000 cases, time 4~4.5 seconds.
> >
> > Although performance is much better than the initial running, the
> > results are not what we expected. The asynchronous version is even slower
> > than the synchronous version in multiple threads environment. (I will send
> > out the detailed implementation of JNI in the later discussion).
> > I was also impressed by the RandomAccessFile's good performance.
> > So I went into the Java's sources to find why.
> > I have a copy of Java's source codes. I don't want to describe how
> > to find the place in bunch of codes. I just pick up the related source codes
> > out.
> > In the $JAVA_SOURCE/j2se/src/share/native/java/io directory, there
> > is a file named "io_util.c" describes how the read does.
> > /* The maximum size of a stack-allocated buffer.
> > */
> > #define BUF_SIZE 8192
> >
> >
> > int
> > readBytes(JNIEnv *env, jobject this, jbyteArray bytes,
> > jint off, jint len, jfieldID fid)
> > {
> > int nread, datalen;
> > char stackBuf[BUF_SIZE];
> > char *buf = 0;
> > FD fd;
> >
> > if (IS_NULL(bytes)) {
> > JNU_ThrowNullPointerException(env, 0);
> > return -1;
> > }
> > datalen = (*env)->GetArrayLength(env, bytes);
> >
> > if ((off < 0) || (off > datalen) ||
> > (len < 0) || ((off + len) > datalen) || ((off + len) < 0)) {
> > JNU_ThrowByName(env, "java/lang/IndexOutOfBoundsException", 0);
> > return -1;
> > }
> >
> > if (len == 0) {
> > return 0;
> > } else if (len > BUF_SIZE) {
> > buf = malloc(len);
> > if (buf == 0) {
> > JNU_ThrowOutOfMemoryError(env, 0);
> > return 0;
> > }
> > } else {
> > buf = stackBuf;
> > }
> >
> > fd = GET_FD(this, fid);
> > nread = IO_Read(fd, buf, len);
> > if (nread > 0) {
> > (*env)->SetByteArrayRegion(env, bytes, off, nread, (jbyte *)buf);
> > } else if (nread == JVM_IO_ERR) {
> > JNU_ThrowIOExceptionWithLastError(env, "Read error");
> > } else if (nread == JVM_IO_INTR) { /* EOF */
> > JNU_ThrowByName(env, "java/io/InterruptedIOException", 0);
> > } else { /* EOF */
> > nread = -1;
> > }
> >
> > if (buf != stackBuf) {
> > free(buf);
> > }
> > return nread;
> > }
> >
> > The codes above don't contain a lot of tricky codes. The only well-known
> > technical is to use a stack allocation( char stackBuf[BUF_SIZE];) for
> > small memory allocation instead of asking for memory allocation from heap(char
> > *buf = 0;) every time. But for aio_read, we have no way to use the stack
> > memory because a call back function must be used. And I even don't think the
> > small codes are the key point.
> > Then I found Sun's implementation using IO_Read(fd, buf, len) to do the
> > real read operation. This macro is defined in
> > $JAVA_SOURCE/j2se/src/solaris/native/java/io/io_util_md.h.
> > #define IO_Read JVM_Read
> >
> > But when I want to look into the JVM_Read codes, I found nothing. I only
> > found its declaration in jvm.h. In VM.c, I still found nothing valuable.
> > I guess there are should some key codes in the closed(or missing?) codes.
> >
> > My next plan is to subscribe the openJDK's mail list to ask for the codes
> > of JVM_Read implementation, at lease implementation in POSIX system.
> >
> > Thanks for reading the long and boring mail. In fact, I think when we
> > limit Java's cross-platform ability, Java can do a lot of things elegantly.
> > But Sun's engineers have done a lot more than what I thought. I wish we
> > would do something to bring the aio really to Java.
> >
> > On 1/16/07, Mike Heath <mh...@apache.org> wrote:
> > >
> > > The Asynchronous File I/O code is still in an experimental state. There
> > > are some known performance issues and I'm working on ways to overcome
> > > them. I am actively working on the library and I appreciate your
> > > feedback.
> > >
> > > Depending on your situation, you may be able to do all the file
> > > processing in a separate thread using Java 5 concurrent library to get
> > > asynchronous I/O without the pains of using JNI.
> > >
> > > Within the next few weeks, I plan to release some code that does
> > > asynchronous file I/O through a separate thread. This will work on all
> > > platforms and not just Linux. It will also give me a baseline that can
> > > be used for performance testing the use of POSIX AIO calls. Within the
> > > next few months I plan to do a lot of performance testing and tweaking.
> > > Any help with this would be appreciated.
> > >
> > > If you have any additional questions, concerns or other feedback, please
> > > let me know.
> > >
> > > -Mike
> > >
> > > On Mon, 2007-01-15 at 18:14 +0800, yueyu lin wrote:
> > > > Hi, everyone,
> > > > I found in the mina contributors, there is an asynchronousFileIO
> > > that is
> > > > using aio_read in linux.
> > > > The URL is http://mina.apache.org/asynchronous-file-io-in-java.html
> > > >
> > > > It happens that I decide to try to replace the RandomAccessFile in
> > > my
> > > > project. Our system is a large and busy system based on huge files in
> > > Linux.
> > > > When profiling, I found that a lot of blocking/waiting in file IO on
> > > the
> > > > same file descriptor. When there is some operation asking for a lot of
> > > IO
> > > > requests(it's quiet often in our system), other threads will be
> > > affected.
> > > > So I just finished an experimental modification that invokes
> > > aio_read in
> > > > Linux in JNI. Since our system is running only in Linux, so it will be
> > > good
> > > > for us.
> > > > But when I tested the performance, the results depressed me. The
> > > > performance is three times lower than the RandomAccessFile in 5
> > > threads.
> > > >
> > > > I search the internet to ses if there's any other one to try that
> > > like
> > > > what I did. Then I found it in the mina contributors. The codes are
> > > almost
> > > > the same with what I did. The difference is just that I'm using C and
> > > the
> > > > contributors' codes are using c++.
> > > > I want to know if you tested the performance and compared with the
> > > sun's
> > > > RandomAccessFile? What's the result?
> > > > I also looked into sun's native method opened some time ago. The
> > > > j2se/share/native/java/io/io_util.c has the detailed codes. It doesn't
> > > have
> > > > any tricky codes. The only one is that it's using a stack variable
> > > byte
> > > > array if the read length is less than 8K. But in the aio_read, it
> > > doesn't
> > > > work. Because we have to allocate new memory to contain the results
> > > or
> > > > using the byte array passed into the native codes. It has to be in the
> > >
> > > > heap. Even so, I don't think it will affect so much .
> > > > Thanks in advance.
> > > > I also want to know how to contact Mike
> > > > Heath<http://cwiki.apache.org/confluence/users/viewuserprofile.action?username=mheath
> > > >.
> > > > He contributes the codes.
> > > >
> > > > BTW: I'm even using the same development environment with Mike
> > > Heath.
> > > > (Ubuntu 6.10).
> > >
> > >
> >
> >
> > --
> > --
> > Yueyu Lin
>
>
>
>
> --
> --
> Yueyu Lin
>
>
Re: Is there someone trying the AsynchronousFileIO?
Posted by yueyu lin <po...@gmail.com>.
I guess O_DIRECT will hurt the performance because it cannot use linux cache
well.
Anyway, I will try the option to see if it will be better.
On 2/3/07, Mike Heath <mh...@apache.org> wrote:
>
> I've been looking over your AIO stuff in more detail and I think part
> of your performance problem might be that you're not opening the file
> with O_DIRECT flag set. I would be very interested to see the
> performance difference setting this flag might make.
>
> -Mike
>
> On 1/17/07, yueyu lin <po...@gmail.com> wrote:
> > OK, I finally implement a stable version based on the signal call in
> > aio_read .
> > Here is the source codes:
> > The header:
> > /* DO NOT EDIT THIS FILE - it is machine generated */
> > #include <jni.h>
> > /* Header for class com_foo_io_AsyRandomAccessFile */
> >
> > #ifndef _Included_com_foo_io_AsyRandomAccessFile
> > #define _Included_com_foo_io_AsyRandomAccessFile
> > #ifdef __cplusplus
> > extern "C" {
> > #endif
> > /*
> > * Class: com_foo_io_AsyRandomAccessFile
> > * Method: read
> > * Signature: (IIJI[BLcom/foo/io/AsyReadRequest;)V
> > */
> > JNIEXPORT void JNICALL Java_com_foo_io_AsyRandomAccessFile_read
> > (JNIEnv *, jclass, jint, jint, jlong, jint, jbyteArray, jobject);
> >
> > /*
> > * Class: com_foo_io_AsyRandomAccessFile
> > * Method: openFd
> > * Signature: (Ljava/lang/String;I)I
> > */
> > JNIEXPORT jint JNICALL Java_com_foo_io_AsyRandomAccessFile_openFd
> > (JNIEnv *, jclass, jstring, jint);
> >
> > /*
> > * Class: com_foo_io_AsyRandomAccessFile
> > * Method: close
> > * Signature: ()V
> > */
> > JNIEXPORT void JNICALL Java_com_foo_io_AsyRandomAccessFile_close
> > (JNIEnv *, jobject);
> >
> > #ifdef __cplusplus
> > }
> > #endif
> > #endif
> >
> > The C source code:
> > #include <fcntl.h>
> > #include <unistd.h>
> > #include <aio.h>
> > #include <stdlib.h>
> > #include <errno.h>
> > #include <sys/stat.h>
> > #include <pthread.h>
> > #include "com_foo_io_AsyRandomAccessFile.h"
> > #define ASYRAF_READ 0
> > #define ASYRAF_WRITE 1
> > #define ASYRAF_RW 2
> > #define SIGNAL_CALL_BACK
> > #define MAX_BUF 8092
> > #define NO_LIST_DEBUG
> > //The signal number to use.
> > #define SIG_AIO SIGRTMIN+5
> > struct _payload {
> > jobject req;
> > jbyteArray buf ;
> > jint offset;
> > struct aiocb *my_aiocb;
> > };
> >
> > typedef struct _payload payload;
> > payload empty_payload;
> >
> > struct _list_point{
> > payload *value;
> > struct _list_point *prev;
> > struct _list_point *next;
> > };
> > typedef struct _list_point list_point;
> > struct _payload_list{
> > int size ;
> > list_point *head;
> > list_point *tail;
> > };
> > typedef struct _payload_list payload_list;
> >
> >
> > static JavaVM *jvm;
> > static jmethodID getFdMethodId ;
> > static jmethodID notifyClientMethodId ;
> > static jclass asyRafClass ;
> >
> > static jmethodID getReqFdMethodId ;
> > static jmethodID getReqOffsetMethodId ;
> > static jmethodID getReqPositionMethodId ;
> > static jmethodID getReqLengthMethodId ;
> > static jmethodID getReqBufMethodId ;
> > static jclass asyReadRequestClass ;
> >
> > //static GAsyncQueue *taskQueue ;
> >
> > static int threadId;
> > static struct sigaction sig_act;
> > static pthread_mutex_t mutex ;
> > static payload_list requests_list;
> > static pthread_cond_t waitCond ;
> >
> >
> > void add_payload(payload_list *my_list,payload *my_payload){
> > if(my_payload == NULL){
> > return ;
> > }
> > list_point *p = (list_point *)malloc(sizeof(list_point));
> > p->value = my_payload;
> > p->next = NULL;
> > pthread_mutex_lock(&mutex);
> > my_list->size ++;
> > list_point *tail = my_list->tail;
> > if(tail == NULL){
> > list_point *head = my_list->head;
> > if(head == NULL){
> > my_list->head = p;
> > }else{
> > head->next = p;
> > my_list->tail = p;
> > }
> > }else{
> > tail->next = p;
> > p->prev = tail;
> > my_list->tail = p;
> > }
> > pthread_mutex_unlock(&mutex);
> > return ;
> > }
> >
> > payload* get_first_payload(payload_list *my_list){
> > if(my_list == NULL){
> > return NULL;
> > }
> > payload *ret ;
> > pthread_mutex_lock(&mutex);
> > list_point *head = my_list->head;
> > list_point *tail = my_list->tail;
> > if(head == NULL){
> > ret = NULL;
> > }else{
> > ret = head->value;
> > list_point *second = head->next;
> > my_list->head = second;
> > if(tail == second){
> > my_list->tail = NULL;
> > }
> > free(head);
> > }
> > if(ret != NULL){
> > my_list->size --;
> > }
> > pthread_mutex_unlock(&mutex);
> > return ret;
> > }
> >
> > void handle_payload(JNIEnv *env,payload *my_payload){
> > struct aiocb *my_aiocb = my_payload->my_aiocb;
> > if(aio_error(my_aiocb) == 0){
> > //notify the data comes
> > aio_return(my_aiocb);
> > jbyteArray buf = my_payload->buf;
> > jbyte* nativeBytes = my_aiocb->aio_buf;
> > jobject req = my_payload->req;
> > (*env)->SetByteArrayRegion(env, buf, my_payload->offset,
> > my_aiocb->aio_nbytes, (jbyte *)nativeBytes);
> > (*env)->CallVoidMethod(env,req,notifyClientMethodId,req);
> > (*env)->DeleteGlobalRef(env,req);
> > free(nativeBytes);
> > free(my_aiocb);
> > free(my_payload);
> > }else{
> > //Also should notify error
> > perror("no data!");
> > }
> > }
> >
> > //This method is used by a thread to send the notification to JVM's
> thread
> > void* startEntry(void* data){
> > JNIEnv *env;
> > int resId = (*jvm)->AttachCurrentThread(jvm,(void **)&env,NULL);
> > if(resId < 0){
> > fprintf(stderr,"The native thread cannot attach to JVM
> thread\n");
> > return NULL;
> > }
> > while(1){
> > payload *my_payload = NULL;
> > while( (my_payload = get_first_payload(&requests_list)) !=
> NULL){
> > handle_payload(env,my_payload);
> > }
> > if(requests_list.size > 0){
> > continue ;
> > }
> > pthread_mutex_lock(&mutex);
> > if(requests_list.size <= 0){
> > pthread_cond_wait(&waitCond,&mutex);
> > }
> > pthread_mutex_unlock(&mutex);
> > // payload *my_payload =
> (payload*)(g_async_queue_pop(taskQueue));
> > // //printf("Receive %ld\n",recCnt);
> > // handle_payload(env,my_payload);
> > }
> > (*jvm)->DetachCurrentThread(jvm);
> > return NULL;
> > }
> >
> > //This is the signal handler
> > void finishReadHandler( int signo, siginfo_t *info, void *context ){
> > /* Ensure it's our signal */
> > if (info->si_signo == SIG_AIO) {
> > //g_async_queue_push(taskQueue,info->si_value.sival_ptr);
> > add_payload(&requests_list,info->si_value.sival_ptr);
> > pthread_mutex_lock(&mutex);
> > pthread_cond_broadcast(&waitCond);
> > pthread_mutex_unlock(&mutex);
> >
> > /*static int cnt = 0;
> > cnt ++;
> > printf("receive signal %ld\n",cnt);*/
> > }
> > }
> >
> > //This is the thread call back
> > void finishRead(sigval_t sig){
> > payload *my_payload = (payload*)sig.sival_ptr;
> > struct aiocb *my_aiocb = my_payload->my_aiocb;
> > /* if(aio_error(my_aiocb) != 0){
> > perror("finish reading err");
> > return ;
> > }*/
> > JNIEnv *env;
> > int resId = (*jvm)->AttachCurrentThread(jvm,(void **)&env,NULL);
> > if(resId < 0){
> > fprintf(stderr,"Cannot attach to JVM thread\n");
> > return ;
> > }
> >
> > jobject req = my_payload->req;
> > jbyteArray buf = my_payload->buf;
> > jbyte* nativeBytes = my_aiocb->aio_buf;
> > (*env)->SetByteArrayRegion(env, buf, my_payload->offset,
> > my_aiocb->aio_nbytes, (jbyte *)nativeBytes);
> > (*env)->CallVoidMethod(env,req,notifyClientMethodId,req);
> > (*env)->DeleteGlobalRef(env,req);
> > (*jvm)->DetachCurrentThread(jvm);
> >
> > free(nativeBytes);
> > free(my_aiocb);
> > free(my_payload);
> > }
> >
> > void printMyaiocb(struct aiocb *myaiocb){
> > printf("fd is %d;offset is %d;length is
> > %d\n",myaiocb->aio_fildes,myaiocb->aio_offset,myaiocb->aio_nbytes);
> > }
> > /*
> > * Class: com_foo_io_AsyRandomAccessFile
> > * Method: read
> > * Signature:
> (IIJILjava/nio/ByteBuffer;Lcom/telenav/io/AsyReadRequest;)V
> > */
> > JNIEXPORT void JNICALL Java_com_foo_io_AsyRandomAccessFile_read
> > (JNIEnv *env, jclass klass, jint fd, jint offset, jlong position, jint
> > length, jbyteArray byteBuf, jobject req){
> > struct aiocb *my_aiocb = (struct aiocb*)malloc(sizeof(struct
> aiocb));
> > payload *my_payload = (payload*)malloc(sizeof(payload));
> >
> > bzero(my_aiocb,sizeof(struct aiocb));
> > bzero(my_payload,sizeof(payload));
> >
> > my_aiocb->aio_fildes = fd;
> > my_aiocb->aio_offset = position;
> > my_aiocb->aio_nbytes = length;
> > my_aiocb->aio_buf = malloc(sizeof(char)*(length+1));
> > my_payload->offset = offset;
> > my_payload->my_aiocb = my_aiocb ;
> > my_payload->req = (*env)->NewGlobalRef(env,req);
> > my_payload->buf = (*env)->NewGlobalRef(env,byteBuf);
> >
> > #ifndef SIGNAL_CALL_BACK
> > my_aiocb->aio_sigevent.sigev_notify = SIGEV_THREAD;
> > my_aiocb->aio_sigevent.sigev_notify_function = finishRead;
> > my_aiocb->aio_sigevent.sigev_notify_attributes = NULL;
> > my_aiocb->aio_sigevent.sigev_value.sival_ptr = my_payload;
> > #endif
> >
> > #ifdef SIGNAL_CALL_BACK
> > my_aiocb->aio_sigevent.sigev_notify = SIGEV_SIGNAL;
> > my_aiocb->aio_sigevent.sigev_signo = SIG_AIO;
> > my_aiocb->aio_sigevent.sigev_value.sival_ptr = my_payload;
> > #endif
> > if(aio_read(my_aiocb) < 0){
> > perror("aio_reading");
> > }
> > }
> >
> > /*
> > * Class: com_foo_io_AsyRandomAccessFile
> > * Method: openFd
> > * Signature: (Ljava/lang/String;I)I
> > */
> > JNIEXPORT jint JNICALL Java_com_foo_io_AsyRandomAccessFile_openFd
> > (JNIEnv *env, jobject raf, jstring fileName, jint acc){
> > int accFlags = O_RDONLY;
> > switch(acc){
> > case ASYRAF_READ:
> > accFlags = O_RDONLY;
> > break;
> > case ASYRAF_WRITE:
> > accFlags = O_WRONLY;
> > break;
> > case ASYRAF_RW:
> > accFlags = O_RDWR;
> > break;
> > }
> > const jchar* charStr =
> (*env)->GetStringUTFChars(env,fileName,NULL);
> > int fd = open(charStr,accFlags);
> > (*env)->ReleaseStringChars(env,fileName,charStr);
> > return fd;
> > }
> > /*
> > * Class: com_foo_io_AsyRandomAccessFile
> > * Method: close
> > * Signature: ()V
> > */
> > JNIEXPORT void JNICALL Java_com_foo_io_AsyRandomAccessFile_close
> > (JNIEnv *env, jobject raf){
> > jint fd = (*env)->CallIntMethod(env,raf,getFdMethodId);
> > close(fd);
> > }
> >
> > //Does not support the previous version
> > jint JNI_OnLoad(JavaVM *vm, void *reserved){
> > JNIEnv *env ;
> > jvm = vm ;
> > int resId = (*vm)->AttachCurrentThread(vm,(void **)&env,NULL);
> > if(resId < 0){
> > fprintf(stderr,"Cannot attach to JVM thread\n");
> > return JNI_VERSION_1_4;
> > }
> > jclass klass =
> > (*env)->FindClass(env,"com/telenav/io/AsyRandomAccessFile");
> > asyRafClass = (*env)->NewGlobalRef(env,klass);
> > getFdMethodId = (*env)->GetMethodID(env,asyRafClass,"getFd","()I");
> > klass = (*env)->FindClass(env,"com/telenav/io/AsyReadRequest");
> > asyReadRequestClass = (*env)->NewGlobalRef(env,klass);
> > getReqFdMethodId =
> > (*env)->GetMethodID(env,asyReadRequestClass,"getFd","()I");
> > getReqOffsetMethodId =
> > (*env)->GetMethodID(env,asyReadRequestClass,"getOffset","()I");
> > getReqPositionMethodId =
> > (*env)->GetMethodID(env,asyReadRequestClass,"getPosition","()J");
> > getReqBufMethodId =
> > (*env)->GetMethodID(env,asyReadRequestClass,"getBs","()[B");
> > getReqLengthMethodId =
> > (*env)->GetMethodID(env,asyReadRequestClass,"getLength","()I");
> > notifyClientMethodId =
> > (*env)->GetMethodID(env,asyReadRequestClass,"notifyClient","()V");
> > (*jvm)->DetachCurrentThread(jvm);
> > #ifdef SIGNAL_CALL_BACK
> > requests_list.head = NULL;
> > requests_list.tail = NULL;
> > requests_list.size = 0;
> > pthread_cond_init(&waitCond,NULL);
> > int ret = pthread_create(&threadId,NULL,startEntry,NULL);
> > if(ret != 0){
> > perror("Create the thread's error!");
> > }
> > sigemptyset(&sig_act.sa_mask);
> > sig_act.sa_flags = SA_SIGINFO;
> > sig_act.sa_sigaction = finishReadHandler;
> > ret = sigaction( SIG_AIO, &sig_act, NULL );
> > if(ret != 0){
> > perror("Hook signal error!");
> > }
> > pthread_mutex_init(&mutex,NULL);
> > #endif
> > printf("ok for jni load\n");
> > return JNI_VERSION_1_4;
> > }
> >
> > void JNI_OnUnload(JavaVM *vm, void *reserved){
> > JNIEnv *env ;
> > jvm = vm ;
> > int resId = (*vm)->AttachCurrentThread(vm,(void **)&env,NULL);
> > if(resId < 0){
> > fprintf(stderr,"Cannot attach to JVM thread\n");
> > return ;
> > }
> > #ifdef SIGNAL_CALL_BACK
> > //Terminate the thread,implement it later
> > #endif
> > (*env)->DeleteGlobalRef(env,asyRafClass);
> > (*jvm)->DetachCurrentThread(jvm);
> > }
> >
> > The performance is improved about 70%~100% compared with the thread
> > notification way.
> > But sadly, it's still slower than Sun's simple implementation. I've
> checked
> > the codes, it just only call read(2) for any read invokation. No special
> > operations.
> > Mentioned by the mail from Sun's employee in OpenJDK, the aio_read is
> > aio_read(3)
> > instead of aio_read(2). That means that's a lib call, not a syscall. The
> > syscall may benefit from the legacy interrupt to improve performance. I
> > also noted that Linux has already put the aio_read implemented in the
> kernel
> > in the Kernel 2.5 or later. But it's still a software way to do that.
> > Indeed, the asynchronous read may improve the throughput, but it needs
> > current application to change their model to event-driven model. My idea
> > that is to replace the RandomAccessFile may not work. Because our
> > application is just work as a block reading way.
> > I don't know if there is any successful large scale system that is using
> the
> > event-driven model especially in large file system's management.(Google
> I
> > guess?)
> > I have no idea that if the experiment should continue. Any of your
> comments
> > are welcome.
> > Thanks.
> >
> >
> > On 1/16/07, yueyu lin <po...@gmail.com> wrote:
> > >
> > > I feel uncomfortable to send mails in the maillist since I didn't ever
> use
> > > Mina. But I think this issue can be a pure technical problem to
> discuss that
> > > may help us to achieve a better performance.
> > >
> > > First, I'll describe how I tested the performance.
> > >
> > >
> > > Test data: File A larger than 1.2 Giga bytes. (Text file)
> > > Test cases: File B to describe the test cases. (Text file)
> > > The format looks like:
> > > offset
> > > (1024 bytes)
> > > ...
> > > This test cases file is generated by a program. It will
> randomly
> > > find an offset in File A and record the offset to File B, retrieve
> 1024
> > > bytes from the offset and write these bytes to the File B. That will
> help to
> > > verify the multiple threads program can get the correct results.
> > >
> > > I have a DataCenter class is to read the test cases from File B and
> give
> > > them to the caller.
> > > public class DataCeneter {
> > >
> > > RandomAccessFile raf = null;
> > >
> > > public DataCeneter(String fileName){
> > > try {
> > > raf = new RandomAccessFile(fileName,"r");
> > > } catch (FileNotFoundException e) {
> > > e.printStackTrace();
> > > }
> > > }
> > >
> > > public synchronized String[] readLines(){
> > > if(raf == null){
> > > return null;
> > > }
> > > try {
> > > String[] ret = new String[2];
> > > ret[0] = raf.readLine();
> > > byte[] bs = new byte[1024];
> > > raf.readFully(bs);
> > > raf.read();
> > > ret[1] = new String(bs);
> > > return ret;
> > > } catch (IOException e) {
> > > return null;
> > > }
> > > }
> > >
> > > }
> > >
> > >
> > > The test program willonly ask the threads to call the readLines
> function
> > > to get the test data. The String[0] is the offset and String[1] is the
> > > value. An abstract test Class make the test job easier and fare to
> very
> > > testers. Different file reader implementation only needs to implement
> read(int
> > > position) and init(String fileName) function. Other jobs like
> comparing
> > > results will be the same in every kinds of implementation.
> > >
> > > public abstract class FileReaderTest {
> > >
> > > int threadNum = 3;
> > > DataCeneter dc = null;
> > > int runningThreads ;
> > > Object mutex = new Object();
> > > int amount = 0;
> > >
> > > public FileReaderTest(int threadNum,String fileName,String
> > > srcFileName){
> > > this.threadNum = threadNum;
> > > dc = new DataCeneter(fileName);
> > > runningThreads = threadNum;
> > > init(srcFileName);
> > > }
> > >
> > > public abstract void init(String srcFileName) ;
> > >
> > > public abstract String read(int position);
> > >
> > > public void runTest(){
> > > for(int i = 0;i < threadNum;i ++){
> > > new Thread(new Runnable() {
> > > public void run() {
> > > String[] lines = null;
> > > while( (lines = dc.readLines()) != null ){
> > > String result = read(Integer.parseInt
> (lines[0]));
> > > if(!lines[1].equals(result)){
> > > System.err.println("Wrong output~");
> > > }
> > > amount ++;
> > > }
> > > synchronized (mutex) {
> > > runningThreads --;
> > > mutex.notifyAll();
> > > }
> > > }
> > > }).start();
> > > }
> > >
> > > while(runningThreads > 0){
> > > synchronized (mutex) {
> > > try {
> > > mutex.wait();
> > > } catch (InterruptedException e) {
> > > // TODO Auto-generated catch block
> > > e.printStackTrace();
> > > }
> > > if(runningThreads <= 0)
> > > break;
> > > }
> > > }
> > >
> > > System.out.println("Finished:"+amount);
> > > }
> > > }
> > >
> > >
> > > Then let's see the RandomAccessFileTest implementation.
> > >
> > > public class RafReaderTest extends FileReaderTest {
> > >
> > > RandomAccessFile raf ;
> > >
> > > public RafReaderTest(int threadNum, String fileName, String
> > > srcFileName) {
> > > super(threadNum, fileName, srcFileName);
> > >
> > > }
> > >
> > >
> > >
> > > public void init(String srcFileName) {
> > > try {
> > > raf = new RandomAccessFile(srcFileName,"r");
> > > } catch (FileNotFoundException e) {
> > > e.printStackTrace();
> > > System.exit(-1);
> > > }
> > > }
> > >
> > > public String read(int position) {
> > > byte[] bs = new byte[1024];
> > > int read = 0;
> > > int offset = 0;
> > > int length = 1024;
> > > synchronized(raf){
> > > try {
> > > raf.seek(position);
> > > while(read != 1024){
> > > int r = raf.read(bs, offset, length);
> > > offset += r;
> > > length -= r;
> > > read += r;
> > > }
> > > }catch (IOException e) {
> > > e.printStackTrace();
> > > }
> > > }
> > > return new String(bs);
> > > }
> > >
> > > public static void main(String[] args){
> > > RafReaderTest rrt = new
> > >
> RafReaderTest(5,"/home/yueyulin/access.txt","/home/yueyulin/hugeFile.txt");
> > > long start = System.currentTimeMillis();
> > > rrt.runTest();
> > > long end = System.currentTimeMillis ();
> > > System.out.println("Raf read time is "+(end-start));
> > > }
> > >
> > > }
> > >
> > > If only open one file descriptor, everytime we have to lock the codes
> to
> > > prevent from changing the offset.
> > > I have implemented another class to use Linux aio_read to read data.
> The
> > > file reader test class is
> > > public class AsyReaderTest extends FileReaderTest {
> > >
> > > private AsyRandomAccessFile araf ;
> > >
> > > public AsyReaderTest(int threadNum, String fileName, String
> > > srcFileName) {
> > > super(threadNum, fileName, srcFileName);
> > > }
> > >
> > > @Override
> > > public void init(String srcFileName) {
> > > // TODO Auto-generated method stub
> > > araf = new AsyRandomAccessFile(srcFileName,"r");
> > > }
> > >
> > > @Override
> > > public String read(int position) {
> > > byte[] bs = new byte[1024];
> > > int offset = 0;
> > > int length = 1024;
> > > araf.read(bs, offset, position, length);
> > > return new String(bs);
> > > }
> > >
> > > public static void main(String[] args){
> > > AsyReaderTest rrt = new
> > >
> AsyReaderTest(5,"/home/yueyulin/access.txt","/home/yueyulin/hugeFile.txt");
> > > long start = System.currentTimeMillis();
> > > rrt.runTest();
> > > long end = System.currentTimeMillis ();
> > > System.out.println("Araf read time is "+(end-start));
> > > }
> > >
> > > }
> > >
> > >
> > > I generated a file named access.txt to contain the test cases. It
> contains
> > > 20000 cases about 20 M or so.
> > >
> > > The results are different in different situation.
> > >
> > >
> > > 1. When I run these two test programs first time. The Asynchronous
> > > version is much better than the RandomAccessFile version. The
> detailed
> > > results is :
> > >
> > >
> > > - RandomAccessFileTest: 20000 cases, time: 120 seconds
> > > - AsyRandomAccessFileTest: 20000 cases, time: 70 seconds
> > >
> > > 2 . You will find the speed is slow. That is explainable
> because
> > > the system starts up and everything is not in cache. Linux will cache
> the
> > > most useful data into
> > > memory. So the next time when I run the tests, the
> > > performance is much better.
> > >
> > > - RandomAccessFileTest: 20000 cases, time: 1~1.9 seconds
> > > - AsyRandomAccessFileTest: 20000 cases, time 4~4.5 seconds.
> > >
> > > Although performance is much better than the initial running,
> the
> > > results are not what we expected. The asynchronous version is even
> slower
> > > than the synchronous version in multiple threads environment. (I will
> send
> > > out the detailed implementation of JNI in the later discussion).
> > > I was also impressed by the RandomAccessFile's good
> performance.
> > > So I went into the Java's sources to find why.
> > > I have a copy of Java's source codes. I don't want to describe
> how
> > > to find the place in bunch of codes. I just pick up the related source
> codes
> > > out.
> > > In the $JAVA_SOURCE/j2se/src/share/native/java/io directory,
> there
> > > is a file named "io_util.c" describes how the read does.
> > > /* The maximum size of a stack-allocated buffer.
> > > */
> > > #define BUF_SIZE 8192
> > >
> > >
> > > int
> > > readBytes(JNIEnv *env, jobject this, jbyteArray bytes,
> > > jint off, jint len, jfieldID fid)
> > > {
> > > int nread, datalen;
> > > char stackBuf[BUF_SIZE];
> > > char *buf = 0;
> > > FD fd;
> > >
> > > if (IS_NULL(bytes)) {
> > > JNU_ThrowNullPointerException(env, 0);
> > > return -1;
> > > }
> > > datalen = (*env)->GetArrayLength(env, bytes);
> > >
> > > if ((off < 0) || (off > datalen) ||
> > > (len < 0) || ((off + len) > datalen) || ((off + len) < 0)) {
> > > JNU_ThrowByName(env, "java/lang/IndexOutOfBoundsException",
> 0);
> > > return -1;
> > > }
> > >
> > > if (len == 0) {
> > > return 0;
> > > } else if (len > BUF_SIZE) {
> > > buf = malloc(len);
> > > if (buf == 0) {
> > > JNU_ThrowOutOfMemoryError(env, 0);
> > > return 0;
> > > }
> > > } else {
> > > buf = stackBuf;
> > > }
> > >
> > > fd = GET_FD(this, fid);
> > > nread = IO_Read(fd, buf, len);
> > > if (nread > 0) {
> > > (*env)->SetByteArrayRegion(env, bytes, off, nread, (jbyte
> *)buf);
> > > } else if (nread == JVM_IO_ERR) {
> > > JNU_ThrowIOExceptionWithLastError(env, "Read error");
> > > } else if (nread == JVM_IO_INTR) { /* EOF */
> > > JNU_ThrowByName(env, "java/io/InterruptedIOException", 0);
> > > } else { /* EOF */
> > > nread = -1;
> > > }
> > >
> > > if (buf != stackBuf) {
> > > free(buf);
> > > }
> > > return nread;
> > > }
> > >
> > > The codes above don't contain a lot of tricky codes. The only
> well-known
> > > technical is to use a stack allocation( char stackBuf[BUF_SIZE];) for
> > > small memory allocation instead of asking for memory allocation from
> heap(char
> > > *buf = 0;) every time. But for aio_read, we have no way to use the
> stack
> > > memory because a call back function must be used. And I even don't
> think the
> > > small codes are the key point.
> > > Then I found Sun's implementation using IO_Read(fd, buf, len) to do
> the
> > > real read operation. This macro is defined in
> > > $JAVA_SOURCE/j2se/src/solaris/native/java/io/io_util_md.h.
> > > #define IO_Read JVM_Read
> > >
> > > But when I want to look into the JVM_Read codes, I found nothing. I
> only
> > > found its declaration in jvm.h. In VM.c, I still found nothing
> valuable.
> > > I guess there are should some key codes in the closed(or missing?)
> codes.
> > >
> > > My next plan is to subscribe the openJDK's mail list to ask for the
> codes
> > > of JVM_Read implementation, at lease implementation in POSIX system.
> > >
> > > Thanks for reading the long and boring mail. In fact, I think when we
> > > limit Java's cross-platform ability, Java can do a lot of things
> elegantly.
> > > But Sun's engineers have done a lot more than what I thought. I wish
> we
> > > would do something to bring the aio really to Java.
> > >
> > > On 1/16/07, Mike Heath <mh...@apache.org> wrote:
> > > >
> > > > The Asynchronous File I/O code is still in an experimental
> state. There
> > > > are some known performance issues and I'm working on ways to
> overcome
> > > > them. I am actively working on the library and I appreciate your
> > > > feedback.
> > > >
> > > > Depending on your situation, you may be able to do all the file
> > > > processing in a separate thread using Java 5 concurrent library to
> get
> > > > asynchronous I/O without the pains of using JNI.
> > > >
> > > > Within the next few weeks, I plan to release some code that does
> > > > asynchronous file I/O through a separate thread. This will work on
> all
> > > > platforms and not just Linux. It will also give me a baseline that
> can
> > > > be used for performance testing the use of POSIX AIO calls. Within
> the
> > > > next few months I plan to do a lot of performance testing and
> tweaking.
> > > > Any help with this would be appreciated.
> > > >
> > > > If you have any additional questions, concerns or other feedback,
> please
> > > > let me know.
> > > >
> > > > -Mike
> > > >
> > > > On Mon, 2007-01-15 at 18:14 +0800, yueyu lin wrote:
> > > > > Hi, everyone,
> > > > > I found in the mina contributors, there is an
> asynchronousFileIO
> > > > that is
> > > > > using aio_read in linux.
> > > > > The URL is
> http://mina.apache.org/asynchronous-file-io-in-java.html
> > > > >
> > > > > It happens that I decide to try to replace the RandomAccessFile
> in
> > > > my
> > > > > project. Our system is a large and busy system based on huge files
> in
> > > > Linux.
> > > > > When profiling, I found that a lot of blocking/waiting in file IO
> on
> > > > the
> > > > > same file descriptor. When there is some operation asking for a
> lot of
> > > > IO
> > > > > requests(it's quiet often in our system), other threads will be
> > > > affected.
> > > > > So I just finished an experimental modification that invokes
> > > > aio_read in
> > > > > Linux in JNI. Since our system is running only in Linux, so it
> will be
> > > > good
> > > > > for us.
> > > > > But when I tested the performance, the results depressed me. The
> > > > > performance is three times lower than the RandomAccessFile in 5
> > > > threads.
> > > > >
> > > > > I search the internet to ses if there's any other one to try
> that
> > > > like
> > > > > what I did. Then I found it in the mina contributors. The codes
> are
> > > > almost
> > > > > the same with what I did. The difference is just that I'm using C
> and
> > > > the
> > > > > contributors' codes are using c++.
> > > > > I want to know if you tested the performance and compared with
> the
> > > > sun's
> > > > > RandomAccessFile? What's the result?
> > > > > I also looked into sun's native method opened some time ago. The
> > > > > j2se/share/native/java/io/io_util.c has the detailed codes. It
> doesn't
> > > > have
> > > > > any tricky codes. The only one is that it's using a stack variable
> > > > byte
> > > > > array if the read length is less than 8K. But in the aio_read, it
> > > > doesn't
> > > > > work. Because we have to allocate new memory to contain the
> results
> > > > or
> > > > > using the byte array passed into the native codes. It has to be in
> the
> > > >
> > > > > heap. Even so, I don't think it will affect so much .
> > > > > Thanks in advance.
> > > > > I also want to know how to contact Mike
> > > > > Heath<
> http://cwiki.apache.org/confluence/users/viewuserprofile.action?username=mheath
> > > > >.
> > > > > He contributes the codes.
> > > > >
> > > > > BTW: I'm even using the same development environment with Mike
> > > > Heath.
> > > > > (Ubuntu 6.10).
> > > >
> > > >
> > >
> > >
> > > --
> > > --
> > > Yueyu Lin
> >
> >
> >
> >
> > --
> > --
> > Yueyu Lin
> >
> >
>
--
--
Yueyu Lin