You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by mh...@apache.org on 2007/01/09 07:23:56 UTC

svn commit: r494327 - in /mina/sandbox/mheath/aioj/trunk: ./ src/main/c/ src/main/java/org/apache/aio/ src/main/java/org/apache/aio/common/

Author: mheath
Date: Mon Jan  8 22:23:54 2007
New Revision: 494327

URL: http://svn.apache.org/viewvc?view=rev&rev=494327
Log:
Added write support

Modified:
    mina/sandbox/mheath/aioj/trunk/src/main/c/Makefile
    mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp
    mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java
    mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AsynchronousFileChannel.java
    mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/common/AbstractAioFuture.java
    mina/sandbox/mheath/aioj/trunk/todo.txt

Modified: mina/sandbox/mheath/aioj/trunk/src/main/c/Makefile
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/c/Makefile?view=diff&rev=494327&r1=494326&r2=494327
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/c/Makefile (original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/c/Makefile Mon Jan  8 22:23:54 2007
@@ -28,6 +28,11 @@
 $(TARGET): org_apache_aio.cpp $(WORKING_DIR)/org_apache_aio_AsynchronousFileChannel.h $(WORKING_DIR)/org_apache_aio_posix_PosixAioFutureReadWrite.h
 	g++ -shared -lrt -lstdc++ $(INCLUDES) org_apache_aio.cpp -o $(TARGET)
 
-$(WORKING_DIR)/org_apache_aio_AsynchronousFileChannel.h $(WORKING_DIR)/org_apache_aio_posix_PosixAioFutureReadWrite.h: $(TARGET_DIR)/classes/org/apache/aio/AsynchronousFileChannel.class $(TARGET_DIR)/classes/org/apache/aio/posix/PosixAioFutureReadWrite.class
+$(WORKING_DIR)/org_apache_aio_AsynchronousFileChannel.h: $(TARGET_DIR)/classes/org/apache/aio/AsynchronousFileChannel.class
 	mkdir -p $(TARGET_DIR)/jni
-	javah -classpath $(JAVAH_CLASSPATH) -d $(WORKING_DIR) org.apache.aio.AsynchronousFileChannel org.apache.aio.posix.PosixAioFutureReadWrite
\ No newline at end of file
+	javah -force -classpath $(JAVAH_CLASSPATH) -d $(WORKING_DIR) org.apache.aio.AsynchronousFileChannel
+
+$(WORKING_DIR)/org_apache_aio_posix_PosixAioFutureReadWrite.h: $(TARGET_DIR)/classes/org/apache/aio/posix/PosixAioFutureReadWrite.class
+	mkdir -p $(TARGET_DIR)/jni
+	javah -force -classpath $(JAVAH_CLASSPATH) -d $(WORKING_DIR) org.apache.aio.posix.PosixAioFutureReadWrite
+	
\ No newline at end of file

Modified: mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp?view=diff&rev=494327&r1=494326&r2=494327
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp (original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp Mon Jan  8 22:23:54 2007
@@ -16,10 +16,19 @@
 #define LOG_DEBUG(s)
 #endif
 
+struct aio_request
+{
+	struct aiocb aio;
+	jobject operation;
+	jobject future;
+	jint allocated_buffer_size;
+	jbyte buffer[];
+};
+
 // --- jvm handler ---
+static JavaVM *jvm;
 
 // --- Exception classes ---
-static JavaVM *jvm;
 static jclass ioException;
 static jclass nullPointerException;
 
@@ -31,9 +40,11 @@
 static jmethodID abstractAioFuture_processFutureListenersID;
 static jmethodID abstractAioFuture_handleErrorID;
 
+// --- IDs for file descriptors ---
 static jfieldID fdID; // ID for java.io.FileDescriptor.fd
 static jfieldID fieldDescID; // ID for org.apache.aio.AsynchronousFileChannel.fd
 
+// --- Operation enums ---
 static jobject operationRead;
 static jobject operationWrite;
 static jobject operationBatchRead;
@@ -43,12 +54,11 @@
 {
 	JNIEXPORT jint JNICALL JNI_OnLoad(JavaVM *vm, void *reserved);
 	JNIEXPORT void JNICALL JNI_OnUnload(JavaVM *vm, void *reserved);
-	void aio_read_completion_handler(sigval_t sigval);
-	void aio_write_completion_handler(sigval_t sigval);
+	void aio_read_write_completion_handler(sigval_t sigval);
 }
 
 // --- Utility Functions ----------------------------------------------------
-jint getFD(JNIEnv *env, jobject asynchFileChannel)
+inline jint getFD(JNIEnv *env, jobject asynchFileChannel)
 {
 	jobject fieldDesc = env->GetObjectField(asynchFileChannel, fieldDescID);
 	return env->GetIntField(fieldDesc, fdID);
@@ -58,13 +68,16 @@
 {
 	jclass cls = env->GetObjectClass(buffer);
 	jmethodID MID_position = env->GetMethodID(cls, "position", "()I");
-	return env->CallIntMethod(buffer, MID_position); 
+	env->DeleteLocalRef(cls);
+	jint ret = env->CallIntMethod(buffer, MID_position);
+	return  ret;
 }
 
 inline void setBufferPosition(JNIEnv *env, jobject buffer, jint position)
 {
 	jclass cls = env->GetObjectClass(buffer);
 	jmethodID MID_position = env->GetMethodID(cls, "position", "(I)Ljava/nio/Buffer;");
+	env->DeleteLocalRef(cls);
 	env->CallVoidMethod(buffer, MID_position, position);
 }
 
@@ -72,6 +85,7 @@
 {
 	jclass cls = env->GetObjectClass(buffer);
 	jmethodID MID_limit = env->GetMethodID(cls, "limit", "()I");
+	env->DeleteLocalRef(cls);
 	return env->CallIntMethod(buffer, MID_limit); 
 }
 
@@ -79,10 +93,11 @@
 {
 	jclass cls = env->GetObjectClass(buffer);
 	jmethodID MID_limit = env->GetMethodID(cls, "limit", "(I)Ljava/nio/Buffer;");
+	env->DeleteLocalRef(cls);
 	env->CallVoidMethod(buffer, MID_limit, limit);
 }
 
-jobject createPosixAioFutureReadWrite(JNIEnv *env, jobject channel, jobject operation, void *aiocb, jobject buffer, long position)
+inline jobject createPosixAioFutureReadWrite(JNIEnv *env, jobject channel, jobject operation, void *aiocb, jobject buffer, long position)
 {
 	jvalue values[5];
 	values[0].l = channel;
@@ -94,43 +109,58 @@
 	return env->NewGlobalRef(future);
 }
 
-struct aiocb *setupAioRequest(JNIEnv *env, jobject asynchronousFileChannel, jobject buffer, jlong position, jobject operation)
+struct aio_request *setupAioRequest(JNIEnv *env, jobject asynchronousFileChannel, jobject buffer, jlong position, jobject operation)
 {
-		// Get address and capacity of buffer
+	// Get address and capacity of buffer
 	if (buffer == NULL) {
 		env->ThrowNew(nullPointerException, "buffer cannot be null");
+		return NULL;
 	}
-	// TODO: Add support for non direct byte buffers
+	// Adjust bufferAddress by the position of the buffer
+	jint bufferPosition = getBufferPosition(env, buffer);
+	jint bufferLimit = getBufferLimit(env, buffer)
+	jint bufferSize = bufferLimit - bufferPosition;
+
+	jint bufferToAllocate;
 	jbyte *bufferAddress = (jbyte *)env->GetDirectBufferAddress(buffer);
 	if (bufferAddress == NULL) {
-		env->ThrowNew(ioException, "Must use direct ByteBuffer");
-		return NULL;
+		bufferToAllocate = bufferSize;
+	} else {
+		bufferAddress += bufferPosition; 
+		bufferToAllocate = 0;
 	}
-	// Adjust bufferAddress by the position of the buffer
-	jint bufferPosition = getBufferPosition(env, buffer); 
-	bufferAddress += bufferPosition; 
-	jint bufferSize = getBufferLimit(env, buffer) - position;
 
 	// Allocate aiocb
-	struct aiocb *req = (aiocb*)malloc(sizeof(aiocb));
-	bzero(req, sizeof(struct aiocb));
+	int aio_request_size = sizeof(aio_request) + bufferToAllocate;
+	struct aio_request *req = (aio_request *)malloc(aio_request_size);
+	bzero(req, aio_request_size);
+	
+	if (bufferAddress == NULL) {
+		req->allocated_buffer_size = bufferToAllocate;
+		bufferAddress = req->buffer;
+		
+		//jbytearray env->NewByteArray(bufferSize);
+		
+		// TODO: For write, read data from buffer and put into allocated buffer and forward position
+	}
+	
+	// TODO: For write, increment position
 	
-	jobject future = createPosixAioFutureReadWrite(env, asynchronousFileChannel, operation, req, buffer, position);
+	req->future = createPosixAioFutureReadWrite(env, asynchronousFileChannel, operation, req, buffer, position);
 
 	// Setup aiocb
-	req->aio_fildes = getFD(env, asynchronousFileChannel);
-	req->aio_offset = position;
-	req->aio_buf = bufferAddress;
-	req->aio_nbytes = bufferSize;
-	
-	req->aio_sigevent.sigev_notify = SIGEV_THREAD;
-	if (operation == operationRead) {
-		req->aio_sigevent.sigev_notify_function = aio_read_completion_handler;
-	} else if (operation == operationWrite) {
-		req->aio_sigevent.sigev_notify_function = aio_write_completion_handler;
-	}
-	req->aio_sigevent.sigev_notify_attributes = NULL;
-	req->aio_sigevent.sigev_value.sival_ptr = future;
+	req->aio.aio_fildes = getFD(env, asynchronousFileChannel);
+	req->aio.aio_offset = position;
+	req->aio.aio_buf = bufferAddress;
+	req->aio.aio_nbytes = bufferSize;
+	
+	// Set other aio request fields
+	req->operation = operation;
+	
+	req->aio.aio_sigevent.sigev_notify = SIGEV_THREAD;
+	req->aio.aio_sigevent.sigev_notify_function = aio_read_write_completion_handler;
+	req->aio.aio_sigevent.sigev_notify_attributes = NULL;
+	req->aio.aio_sigevent.sigev_value.sival_ptr = req;
 	
 	return req;
 }
@@ -214,14 +244,15 @@
 	fprintf(stdout, "aio write at file position %d\n", position);
 	fflush(stdout);
 #endif
-	struct aiocb *req = setupAioRequest(env, obj, buffer, position, operationWrite);
-	int ret = aio_write(req);
+	struct aio_request *req = setupAioRequest(env, obj, buffer, position, operationWrite);
+	
+	int ret = aio_write(&req->aio);
 	if (ret) {
 		// TODO Handle errors from ret
 		// return a null on error.
 		return NULL;
 	}
-	return (jobject)req->aio_sigevent.sigev_value.sival_ptr;
+	return req->future;
 }
 
 JNIEXPORT jobject JNICALL Java_org_apache_aio_AsynchronousFileChannel_read
@@ -231,9 +262,9 @@
 	fprintf(stdout, "aio read at file position %d\n", position);
 	fflush(stdout);
 #endif
-	struct aiocb *req = setupAioRequest(env, obj, buffer, position, operationRead);
+	struct aio_request *req = setupAioRequest(env, obj, buffer, position, operationRead);
 	LOG_DEBUG("Do aio read\n");
-	int ret = aio_read(req);
+	int ret = aio_read(&req->aio);
 	if (ret) {
 		// TODO Handle errors from ret
 		// return a null on error.
@@ -241,7 +272,7 @@
 	}
 
 	// Return future object	
-	return (jobject)req->aio_sigevent.sigev_value.sival_ptr;
+	return req->future;
 }
 
 JNIEXPORT jobject JNICALL Java_org_apache_aio_AsynchronousFileChannel_batchRead
@@ -268,8 +299,8 @@
 JNIEXPORT jboolean JNICALL Java_org_apache_aio_posix_PosixAioFutureReadWrite_cancel
   (JNIEnv *env, jobject obj)
 {
-	struct aiocb *req = (struct aiocb *)env->GetLongField(obj, posixAioFutureReadWrite_aiocbPtrID);
-	int ret = aio_cancel(req->aio_fildes, req);
+	struct aio_request *req = (struct aio_request *)env->GetLongField(obj, posixAioFutureReadWrite_aiocbPtrID);
+	int ret = aio_cancel(req->aio.aio_fildes, &req->aio);
 	if (ret == AIO_CANCELED) {
 		free(req);
 		return 1;
@@ -278,65 +309,48 @@
 }
 
 // --- Completion handlers ----------------------------------------------------------------
-enum operation_mode {READ, WRITE};
-
-void aio_read_write_completion_handler(sigval_t sigval, operation_mode mode, char *jvmAttachErrorMessage, char *failureMessage)
+void aio_read_write_completion_handler(sigval_t sigval)
 {
+	LOG_DEBUG("In AIO completion handler\n")
+
 	JNIEnv *env;
 	jint res = jvm->AttachCurrentThread((void**)&env, NULL);
 	if (res < 0) {
-		fprintf(stderr, jvmAttachErrorMessage);
+		fprintf(stderr, "Could not attach JVM to AIO thread");
 		fflush(stderr);
 		return;
 	}
 	
-	jobject future = (jobject)sigval.sival_ptr;
-	struct aiocb *req = (struct aiocb *)env->GetLongField(future, posixAioFutureReadWrite_aiocbPtrID);
+	struct aio_request *req = (aio_request *)sigval.sival_ptr;
 	
 	/* Did the request complete? */
-	if (aio_error(req) == 0) {
+	if (aio_error(&req->aio) == 0) {
 		/* Request completed successfully, get number of bytes processed */
-		int ret = aio_return( req );
-		
-		if (mode == READ) {
+		int ret = aio_return(&req->aio);
+
+		if (req->operation == operationRead) {
+			jobject buffer = env->GetObjectField(req->future, posixAioFutureReadWrite_bufferID);
 			// Adjust buffer limit
-			jobject buffer = env->GetObjectField(future, posixAioFutureReadWrite_bufferID);
+			// TODO: Add support for writing read data to heap byte buffer 
+
+			// Get buffer position
 			int limit = ret + getBufferPosition(env, buffer);
 			setBufferLimit(env, buffer, limit);
-		} else if (mode == WRITE) {
-			// Adjust buffer position
-			jobject buffer = env->GetObjectField(future, posixAioFutureReadWrite_bufferID);
-			int position = getBufferPosition(env, buffer) + ret;
-			setBufferPosition(env, buffer, position);
 		}
 		
 		// Call aio listeners
-		env->CallVoidMethod(future, abstractAioFuture_processFutureListenersID);
+		env->CallVoidMethod(req->future, abstractAioFuture_processFutureListenersID);
 	} else {
 		// TODO Write a unit test for this
-		env->ThrowNew(ioException, failureMessage);
+		env->ThrowNew(ioException, "Could not complete AIO request");
 		jthrowable e = env->ExceptionOccurred();
-		env->CallVoidMethod(future, abstractAioFuture_handleErrorID, e);
+		env->CallVoidMethod(req->future, abstractAioFuture_handleErrorID, e);
 	}
 		
 	// Free resources		
+	env->DeleteGlobalRef(req->future);
 	free(req);
-	env->DeleteGlobalRef(future);
 
 	jvm->DetachCurrentThread();
 	return;
 } 
-
-void aio_read_completion_handler(sigval_t sigval)
-{
-	LOG_DEBUG("In read completion handler\n")
-	
-	aio_read_write_completion_handler(sigval, READ, "Failed to attach JVM to AIO read thread\n", "Could not complete read request");
-}
-
-void aio_write_completion_handler(sigval_t sigval)
-{
-	LOG_DEBUG("In write completion handler\n")
-
-	aio_read_write_completion_handler(sigval, WRITE, "Failed to attach JVM to AIO write thread\n", "Could not complete write request");
-}

Modified: mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java?view=diff&rev=494327&r1=494326&r2=494327
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java (original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java Mon Jan  8 22:23:54 2007
@@ -4,6 +4,12 @@
 
 public interface AioFuture<T extends AioFuture> {
 
+	public boolean cancel();
+
+	public boolean isCancelled();
+	
+	public boolean isDone();
+
 	public void addListener(AioFutureListener<T> ioFutureListener);
 	
 	public void removeListener(AioFutureListener<T> ioFutureListener);
@@ -14,17 +20,11 @@
 	
 	public AsynchronousFileChannel getChannel();
 	
-	public boolean isCompleted();
-
-	public boolean isSuccessful();
-	
 	public Throwable getException();
 	
 	public void join() throws InterruptedException;
 	
 	public boolean join(long timeout, TimeUnit timeUnit) throws InterruptedException;
-	
-	public boolean cancel();
 	
 	public Operation getOperation();
 	

Modified: mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AsynchronousFileChannel.java
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AsynchronousFileChannel.java?view=diff&rev=494327&r1=494326&r2=494327
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AsynchronousFileChannel.java (original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AsynchronousFileChannel.java Mon Jan  8 22:23:54 2007
@@ -1,12 +1,19 @@
 package org.apache.aio;
 
+import java.io.Closeable;
 import java.io.FileDescriptor;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.channels.Channel;
 
-public class AsynchronousFileChannel {
+/**
+ * 
+ * @author Mike Heath <mh...@apache.org>
+ */
+public class AsynchronousFileChannel implements Closeable, Channel {
 
 	static {
+		// TODO Set up some loader so we can store the .so in the aioj.jar
 		System.loadLibrary("aioj");
 	}
 
@@ -16,9 +23,8 @@
 		if (fd == null) {
 			throw new NullPointerException("fd cannot be null");
 		}
-		SecurityManager sm = System.getSecurityManager();
-		if (sm != null) {
-			sm.checkWrite(fd);
+		if (!fd.valid()) {
+			throw new IOException("fd must be a valid FileDescriptor");
 		}
 		this.fd = fd;
 	}
@@ -31,11 +37,20 @@
 		return fd.valid();
 	}
 	
-	public native AioFutureReadWrite write(ByteBuffer buffer, long position) throws IOException;
+	/**
+	 * Calling close closes the AsynchronousFileChannel but does not actually close
+	 * the file.  You must close the file object where the file descriptor originated from.
+	 *
+	 */
+	public void close() {
+		fd = new FileDescriptor();
+	}
 	
 	public native AioFutureReadWrite read(ByteBuffer buffer, long position) throws IOException;
 	
 	public native AioFutureBatch batchRead(BatchRequest... reads) throws IOException;
+	
+	public native AioFutureReadWrite write(ByteBuffer buffer, long position) throws IOException;
 	
 	public native AioFutureBatch batchWrite(BatchRequest... writes) throws IOException;
 	

Modified: mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/common/AbstractAioFuture.java
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/common/AbstractAioFuture.java?view=diff&rev=494327&r1=494326&r2=494327
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/common/AbstractAioFuture.java (original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/common/AbstractAioFuture.java Mon Jan  8 22:23:54 2007
@@ -57,11 +57,11 @@
 		return operation;
 	}
 
-	public boolean isCompleted() {
+	public boolean isDone() {
 		return completed;
 	}
 	
-	public boolean isSuccessful() {
+	public boolean isCancelled() {
 		return exception == null;
 	}
 	
@@ -79,7 +79,7 @@
 		if (!completed) {
 			wait(timeUnit.toMillis(timeout));
 		}
-		return isCompleted();
+		return isDone();
 	}
 
 	@SuppressWarnings("unchecked")

Modified: mina/sandbox/mheath/aioj/trunk/todo.txt
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/todo.txt?view=diff&rev=494327&r1=494326&r2=494327
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/todo.txt (original)
+++ mina/sandbox/mheath/aioj/trunk/todo.txt Mon Jan  8 22:23:54 2007
@@ -1,13 +1,23 @@
 - Add support for heap buffers
 - Throw IO Exceptions when aio calls return an error
 - Figure out batch requests
-- Add support for timeouts
+- Add support for timeouts -- does AIO itself have support for timeouts?
 - Add support for receiving file modification events
 - Make sure we're checking for errors on all JNI method calls
+- Modify AIOFuture to extend Java 5 Future
+- Look into simplifying AioFuture using generics
+- Update ByteBuffer immediately
+- Add support for aio_fsync
+- Provide an asynchronous open
 
 === Testing ===
 - Make sure the AsynchronousFileChannel can be unloaded without holding onto native references
 - Test for memory leaks
 - Do performance tests
 - Write unit tests
-- Determine if doing AttachCurrentThread or AttachCurrentThreadAsDaemon is faster in callback
\ No newline at end of file
+- Determine if doing AttachCurrentThread or AttachCurrentThreadAsDaemon is faster in callback
+- Do a test using util.concurrent asynch I/O
+
+=== Things to ask Alan Bateman ===
+- What about asynchronous file opens?
+- Is there an easy way to get to the ByteBuffer in a completion handler when doing a read?