You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by mh...@apache.org on 2007/02/24 23:43:52 UTC

svn commit: r511365 - in /mina/sandbox/mheath/aioj/trunk: ./ src/main/c/ src/main/java/org/apache/aio/ src/main/java/org/apache/aio/concurrent/ src/main/java/org/apache/aio/linux/ src/main/java/org/apache/aio/posix/ src/test/java/org/apache/aio/concurr...

Author: mheath
Date: Sat Feb 24 14:43:51 2007
New Revision: 511365

URL: http://svn.apache.org/viewvc?view=rev&rev=511365
Log:
Working aio read support.

Added:
    mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/AbstractAioFutureCallBackHandler.java
    mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixAioFutureImpl.java
Modified:
    mina/sandbox/mheath/aioj/trunk/pom.xml
    mina/sandbox/mheath/aioj/trunk/src/main/c/Makefile
    mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp
    mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java
    mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/ByteBufferPosition.java
    mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/AioFutureImpl.java
    mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/ConcurrentAsynchronousFileChannel.java
    mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/ConcurrentAsynchronousFileChannelProvider.java
    mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/linux/LinuxAsynchronousFileChannelProvider.java
    mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixAsynchronousFileChannel.java
    mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixAsynchronousFileChannelProvider.java
    mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/concurrent/TestReads.java
    mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/posix/TestPosixFileChannel.java
    mina/sandbox/mheath/aioj/trunk/todo.txt

Modified: mina/sandbox/mheath/aioj/trunk/pom.xml
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/pom.xml?view=diff&rev=511365&r1=511364&r2=511365
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/pom.xml (original)
+++ mina/sandbox/mheath/aioj/trunk/pom.xml Sat Feb 24 14:43:51 2007
@@ -5,7 +5,7 @@
 
 	<modelVersion>4.0.0</modelVersion>
 	<groupId>org.apache.aio</groupId>
-	<artifactId>aio</artifactId>
+	<artifactId>aioj</artifactId>
 	<version>0.2-SNAPSHOT</version>
 	<name>Asynchronous File I/O in Java</name>
 	<description>

Modified: mina/sandbox/mheath/aioj/trunk/src/main/c/Makefile
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/c/Makefile?view=diff&rev=511365&r1=511364&r2=511365
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/c/Makefile (original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/c/Makefile Sat Feb 24 14:43:51 2007
@@ -25,7 +25,7 @@
 
 all: $(TARGET)
 
-$(TARGET): org_apache_aio.cpp $(WORKING_DIR)/org_apache_aio_test_Test.h $(WORKING_DIR)/org_apache_aio_posix_PosixAsynchronousFileChannel.h $(WORKING_DIR)/org_apache_aio_linux_LinuxAsynchronousFileChannel.h
+$(TARGET): org_apache_aio.cpp $(WORKING_DIR)/org_apache_aio_test_Test.h $(WORKING_DIR)/org_apache_aio_posix_PosixAsynchronousFileChannel.h $(WORKING_DIR)/org_apache_aio_linux_LinuxAsynchronousFileChannel.h $(WORKING_DIR)/org_apache_aio_posix_PosixAioFutureImpl.h
 	g++ -shared -lrt -lstdc++ $(INCLUDES) org_apache_aio.cpp -o $(TARGET)
 
 $(WORKING_DIR)/org_apache_aio_test_Test.h: $(TARGET_DIR)/classes/org/apache/aio/test/Test.class
@@ -35,6 +35,10 @@
 $(WORKING_DIR)/org_apache_aio_posix_PosixAsynchronousFileChannel.h: $(TARGET_DIR)/classes/org/apache/aio/posix/PosixAsynchronousFileChannel.class
 	mkdir -p $(TARGET_DIR)/jni
 	javah -force -classpath $(JAVAH_CLASSPATH) -d $(WORKING_DIR) org.apache.aio.posix.PosixAsynchronousFileChannel
+
+$(WORKING_DIR)/org_apache_aio_posix_PosixAioFutureImpl.h: $(TARGET_DIR)/classes/org/apache/aio/posix/PosixAioFutureImpl.class
+	mkdir -p $(TARGET_DIR)/jni
+	javah -force -classpath $(JAVAH_CLASSPATH) -d $(WORKING_DIR) org.apache.aio.posix.PosixAioFutureImpl
 
 $(WORKING_DIR)/org_apache_aio_linux_LinuxAsynchronousFileChannel.h: $(TARGET_DIR)/classes/org/apache/aio/linux/LinuxAsynchronousFileChannel.class
 	mkdir -p $(TARGET_DIR)/jni

Modified: mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp?view=diff&rev=511365&r1=511364&r2=511365
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp (original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp Sat Feb 24 14:43:51 2007
@@ -1,25 +1,78 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
 #include <aio.h>
+#include <errno.h>
 #include <jni.h>
 #include <malloc.h>
+#include <signal.h>
 #include <stdio.h>
 #include <string.h>
 
 #include "org_apache_aio_test_Test.h"
+#include "org_apache_aio_posix_PosixAioFutureImpl.h"
 #include "org_apache_aio_posix_PosixAsynchronousFileChannel.h"
 #include "org_apache_aio_linux_LinuxAsynchronousFileChannel.h"
 
+// The signal number to use for AIO callbacks
+#define SIG_AIO (SIGRTMIN + 5)
+
 #define DEBUG
 
 #ifdef DEBUG
 #define LOG_DEBUG(s) { fprintf(stdout, s); fflush(stdout); }
+#define LOG_DEBUG_PARAM(s, t) { fprintf(stdout, s, t); fflush(stdout); }
 #endif
 
 #ifndef DEBUG
 #define LOG_DEBUG(s)
+#define LOG_DEBUG_PARAM(s, t)
 #endif
 
 #define JNI_VERSION JNI_VERSION_1_4
 
+#define NULL_CHECK(a) { if (a == NULL) { fprintf(stderr, "%s\n", "Error on " #a); return -1;} }
+
+struct buffer_size
+{
+	jint bufferSize; // The size of the buffer
+	jint allocatedBufferSize; // The number of bytes allocated for non-direct ByteBuffers
+	jbyte* buffer; // The address of the buffer
+};
+
+struct aio_request
+{
+	struct aiocb cb;
+	jobject future;
+	jint flags;
+	jint bufferCount;
+	struct buffer_size buffers[];
+};
+
+enum Flags {
+	NO_FLAG,
+	READ_FLAG,
+	WRITE_FLAG,
+	BATCH_READ,
+	BATCH_WRITE 
+};
+
 // --- jvm handler ---
 static JavaVM *jvm;
 
@@ -28,6 +81,31 @@
 static jclass ioException;
 static jclass nullPointerException;
 
+// --- java.lang.Integer ids
+static jclass Integer;
+static jmethodID MID_Integer_valueOf;
+
+// --- java.io.FileDescriptor ids
+static jfieldID FID_fd;
+
+// --- org.apache.aio.posix.PosixAsynchronousFileChannel ids
+static jfieldID FID_posixChannelThreadNotify;
+static jfieldID FID_posixChannelFd;
+
+// --- org.apache.aio.posix.PosixAioFutureImpl
+static jfieldID FID_posixAioFuture_aioRequestPtr;
+static jmethodID MID_posixAioFuture_setValue;
+static jmethodID MID_posixAioFuture_setException;
+static jmethodID MID_posixAioFtgure_setValueInExecutorService;
+
+// --- org.apache.aio.posix.PosixAioFutureImpl.PosixAioReadWriteFuture
+static jclass posixAioReadWriteFuture;
+static jmethodID CID_posixAioReadWriteFuture;
+static jfieldID FID_posixAioReadWriteFuture_buffer;
+
+// --- Signal handler sigaction
+static struct sigaction sig_act;
+
 extern "C"
 {
 	JNIEXPORT jint JNICALL JNI_OnLoad(JavaVM *vm, void *reserved);
@@ -36,6 +114,248 @@
 
 /**********************************************************************************************************************
  * 
+ * Utility functions
+ * 
+ *********************************************************************************************************************/
+
+/*
+ * Returns the OS file descriptor from a PosixAsynchronousFileChannel object
+ */
+inline jint getPosixChannelFD(JNIEnv *env, jobject posixChannel)
+{
+        jobject fd = env->GetObjectField(posixChannel, FID_posixChannelFd);
+        return env->GetIntField(fd, FID_fd);
+}
+
+/*
+ * Allocates an aio_request.  If non-direct ByteBuffer's are being used, this function will allocate space on the heap.
+ * For write operations, the data from the non-direct ByteBuffer will need be copied to allocated space on the heap.
+ * For read operations, the data stored in the heap will need be copied to the non-direct ByteBuffer.
+ * 
+ * Params:
+ *   bufferCount - the number of buffers to allocate
+ *   bufferSizeToAllocate - the total number of bytes that need to be allocated on the heap for non-direct ByteBuffers
+ * Returns:
+ *   an aio_request struct ptr
+ */
+struct aio_request *allocateRequest(int bufferCount, int bufferSizeToAllocate)
+{
+	int request_size = sizeof(aio_request) + sizeof(buffer_size) * bufferCount + bufferSizeToAllocate;
+	struct aio_request *request = (aio_request *)malloc(request_size);
+	bzero(request, request_size);
+	request->bufferCount = bufferCount;
+
+	return request;
+}
+
+jbyte* getBufferAddress(struct aio_request *request, int bufferIndex)
+{
+	jbyte *address = (jbyte *)request;
+	for (int i = 0; i < bufferIndex && i < request->bufferCount; i++) {
+		address += request->buffers[i].allocatedBufferSize;
+	}
+	return NULL;
+}
+
+void cleanupPosixAio(JNIEnv *env, struct aio_request *request)
+{
+	if (request != NULL) {
+		if (request->future != NULL) {
+			env->DeleteGlobalRef(request->future);
+		}
+		free(request);
+	}
+}
+
+jobject createPosixReadWriteAioFuture(JNIEnv *env, jobject channel, struct aio_request *request, jobject buffer, jlong position)
+{
+	jvalue values[4];
+	values[0].l = channel;
+	values[1].j =(jlong)request;
+	values[2].l = buffer;
+	values[3].j = position;
+	jobject future = env->NewObjectA(posixAioReadWriteFuture, CID_posixAioReadWriteFuture, values);
+	if (future == NULL) {
+		fprintf(stderr, "Could not create posix future object");
+	}
+	return future;
+}
+
+inline jint getByteBufferLimit(JNIEnv *env, jobject buffer, jclass bufferClass)
+{
+	jmethodID MID_limit = env->GetMethodID(bufferClass, "limit", "()I");
+	return env->CallIntMethod(buffer, MID_limit);
+} 
+
+inline jint getByteBufferPosition(JNIEnv *env, jobject buffer, jclass bufferClass)
+{
+	jmethodID MID_position = env->GetMethodID(bufferClass, "position", "()I");
+	return env->CallIntMethod(buffer, MID_position);
+	
+}
+
+inline void setByteBufferPosition(JNIEnv *env, jobject buffer, jclass bufferClass, jint position)
+{
+	jmethodID MID_setPosition = env->GetMethodID(bufferClass, "position", "(I)Ljava/nio/Buffer;");
+	jobject obj = env->CallObjectMethod(buffer, MID_setPosition, position);
+	env->DeleteLocalRef(obj);
+}
+
+inline jboolean byteBufferHasArray(JNIEnv *env, jobject buffer, jclass bufferClass)
+{
+	jmethodID MID_hasArray = env->GetMethodID(bufferClass, "hasArray", "()Z");
+	return env->CallBooleanMethod(buffer, MID_hasArray);
+}
+
+inline jint getByteBufferRemaining(JNIEnv *env, jobject buffer, jclass bufferClass)
+{
+	jmethodID MID_remaining = env->GetMethodID(bufferClass, "remaining", "()I");
+	return env->CallIntMethod(buffer, MID_remaining);
+}
+
+inline jbyteArray getByteBufferArray(JNIEnv *env, jobject buffer, jclass bufferClass)
+{
+	jmethodID MID_array = env->GetMethodID(bufferClass, "array", "()[B");
+	return (jbyteArray)env->CallObjectMethod(buffer, MID_array);
+}
+
+/**********************************************************************************************************************
+ * 
+ * POSIX AIO Callback functions
+ * 
+ *********************************************************************************************************************/
+
+void processPosixAioCallback(JNIEnv *env, struct aio_request *request, bool callbackInExecutorService)
+{
+	LOG_DEBUG("Processing callback\n");
+
+	int ret = aio_error(&request->cb); 	
+	LOG_DEBUG_PARAM("aio_error returned %d\n", ret);
+	if (ret == 0) {
+		// Request completed successfully
+
+		int totalBytes = aio_return(&request->cb);
+		LOG_DEBUG_PARAM("Processed %d bytes\n", totalBytes);
+		if (totalBytes < 0) {
+			// TODO: Throw exception
+			perror("Error with aio_return");
+			cleanupPosixAio(env, request);
+			return;	
+		}
+		if (request->flags & READ_FLAG) {
+			LOG_DEBUG("Handling read\n");
+			// Advance buffer position
+			jobject buffer = env->GetObjectField(request->future, FID_posixAioReadWriteFuture_buffer);
+			if (buffer != NULL) {
+				LOG_DEBUG("Advancing buffer position\n");
+				
+				jclass bufferClass = env->GetObjectClass(buffer);
+				int remaining = getByteBufferRemaining(env, buffer, bufferClass);
+				if (remaining < totalBytes) {
+					// TODO: Throw exception
+					cleanupPosixAio(env, request);
+					
+					env->DeleteLocalRef(buffer);
+					env->DeleteLocalRef(bufferClass);
+					
+					return;	
+				}
+
+				// Copy data for heap buffer 
+				jint position = getByteBufferPosition(env, buffer, bufferClass);
+				jbyte *address = (jbyte *)env->GetDirectBufferAddress(buffer);
+				if (address == NULL) {
+					if (byteBufferHasArray(env, buffer, bufferClass)) {
+						jbyteArray array = getByteBufferArray(env, buffer, bufferClass);
+						jbyte *arrayPtr = (jbyte *)env->GetPrimitiveArrayCritical(array, NULL);
+						memcpy(arrayPtr, request->buffers[0].buffer, totalBytes);
+						env->ReleasePrimitiveArrayCritical(array, arrayPtr, 0);
+						env->DeleteLocalRef(array);
+					} else {
+						// TODO: Handle byte buffers that don't support arrays
+					}
+				} else {
+					LOG_DEBUG("Setting buffer position in direct buffer\n");
+					setByteBufferPosition(env, buffer, bufferClass, position + totalBytes);				
+				}
+
+				env->DeleteLocalRef(bufferClass);
+			}
+			env->DeleteLocalRef(buffer);
+		}
+
+		LOG_DEBUG("Invoke callbacks\n");		
+		jobject totalBytesObj = env->CallStaticObjectMethod(Integer, MID_Integer_valueOf, totalBytes);
+		if (callbackInExecutorService) {
+			env->CallVoidMethod(request->future, MID_posixAioFtgure_setValueInExecutorService, totalBytesObj);
+		} else {
+			env->CallVoidMethod(request->future, MID_posixAioFuture_setValue, totalBytesObj);
+		}
+		env->DeleteLocalRef(totalBytesObj);
+		LOG_DEBUG("Done with callbacks\n");		
+	} else {
+		char *msg;
+		switch (ret) {
+			case EFAULT:
+				msg = "Memory fault";
+			break;
+			case EINVAL: 
+				msg = "Invalid AIO Control Block";
+			break;
+			case EINPROGRESS:
+				msg = "The AIO request was still in progress when the handler was called";
+			break;
+			case ECANCELED:
+				msg = "The AIO request was cancelled";
+			break;
+			default:
+				msg = "Unkown AIO error";
+			break;
+		}
+		// TODO call future setException method
+		perror(msg);
+	}
+
+	cleanupPosixAio(env, request);	
+}
+
+void aioSignalHandler(int signo, siginfo_t *info, void *context)
+{
+	// Make sure we're getting the signal we're expecting
+	if (info->si_signo == SIG_AIO) {
+		LOG_DEBUG("In signal handler\n");
+		
+		JNIEnv *env;
+		jvm->GetEnv((void**)&env, JNI_VERSION);
+		if (env == NULL) {
+			perror("Unable to obtain reference to JNI Environment, can not handle AIO completion\n");
+		} else {
+			struct aio_request *req = (struct aio_request *)info->si_value.sival_ptr;
+			processPosixAioCallback(env, req, true);
+		}
+	}
+}
+
+void aioThreadHandler(sigval_t sigval)
+{
+	LOG_DEBUG("In thread completion handler\n");
+	JNIEnv *env;
+	jint res = jvm->AttachCurrentThread((void**)&env, NULL);
+	if (res < 0) {
+		fprintf(stderr, "Could not attach JVM to AIO thread");
+		fflush(stderr);
+		return;
+	}
+	
+	struct aio_request *req = (aio_request *)sigval.sival_ptr;
+	processPosixAioCallback(env, req, false);
+	
+	jvm->DetachCurrentThread();
+	return;
+}
+
+/**********************************************************************************************************************
+ * 
  * JNI OnLoad and OnUnload functions
  * 
  *********************************************************************************************************************/
@@ -57,14 +377,57 @@
 	
 	jclass cls;
 	
-	cls = env->FindClass("org/apache/aio/AioException");
-	aioException = (jclass)env->NewWeakGlobalRef(cls);
-
-	cls = env->FindClass("java/io/IOException");
-	ioException = (jclass)env->NewWeakGlobalRef(cls);
+	// --- AioException -----------------------------------------------------------------
+	cls = env->FindClass("org/apache/aio/AioException"); NULL_CHECK(cls)
+	aioException = (jclass)env->NewWeakGlobalRef(cls); NULL_CHECK(aioException)
+
+	// --- IoException ------------------------------------------------------------------
+	cls = env->FindClass("java/io/IOException"); NULL_CHECK(cls)
+	ioException = (jclass)env->NewWeakGlobalRef(cls); NULL_CHECK(ioException)
+	
+	// --- NullPointerException ---------------------------------------------------------
+	cls = env->FindClass("java/lang/NullPointerException"); NULL_CHECK(cls)
+	nullPointerException = (jclass)env->NewWeakGlobalRef(cls); NULL_CHECK(nullPointerException)
+	
+	// --- java.lang.Integer ------------------------------------------------------------
+	cls = env->FindClass("java/lang/Integer"); NULL_CHECK(cls)
+	Integer = (jclass)env->NewWeakGlobalRef(cls); NULL_CHECK(Integer);
+	MID_Integer_valueOf = env->GetStaticMethodID(Integer, "valueOf", "(I)Ljava/lang/Integer;");
 	
-	cls = env->FindClass("java/lang/NullPointerException");
-	nullPointerException = (jclass)env->NewWeakGlobalRef(cls);
+	// --- java.io.FileDescriptor -------------------------------------------------------
+	cls = env->FindClass("java/io/FileDescriptor"); NULL_CHECK(cls)
+	FID_fd = env->GetFieldID(cls, "fd", "I"); NULL_CHECK(FID_fd)
+
+	// --- org.apache.aio.posix.PosixAsynchronousFileChannel ----------------------------
+	LOG_DEBUG("Getting ids for PosixAsynchronousFileChannel\n")
+	cls = env->FindClass("org/apache/aio/posix/PosixAsynchronousFileChannel"); NULL_CHECK(cls)
+	FID_posixChannelThreadNotify = env->GetFieldID(cls, "threadNotify", "Z"); NULL_CHECK(FID_posixChannelThreadNotify)
+	FID_posixChannelFd = env->GetFieldID(cls, "fd", "Ljava/io/FileDescriptor;"); NULL_CHECK(FID_posixChannelFd)
+	
+	// --- org.apache.aio.posix.PosixAioFutureImpl --------------------------------------
+	LOG_DEBUG("Getting ids for PosixAioFutureImpl\n")
+	cls = env->FindClass("org/apache/aio/posix/PosixAioFutureImpl"); NULL_CHECK(cls)
+	FID_posixAioFuture_aioRequestPtr = env->GetFieldID(cls, "aioRequestPtr", "J"); NULL_CHECK(FID_posixAioFuture_aioRequestPtr)
+	MID_posixAioFuture_setValue = env->GetMethodID(cls, "setValue", "(Ljava/lang/Object;)V"); NULL_CHECK(MID_posixAioFuture_setValue)
+	MID_posixAioFtgure_setValueInExecutorService = env->GetMethodID(cls, "setValueInExecutorService", "(Ljava/lang/Object;)V"); NULL_CHECK(MID_posixAioFuture_setValue)
+	MID_posixAioFuture_setException = env->GetMethodID(cls, "setException", "(Ljava/lang/Throwable;)V"); NULL_CHECK(MID_posixAioFuture_setException)
+	
+	// --- org.apache.aio.posix.PosixAioFutureImpl.PosixReadWriteAioFuture --------------
+	LOG_DEBUG("Getting ids for PosixAioFutureImpl.PosixReadWriteAioFuture\n")
+	cls = env->FindClass("org/apache/aio/posix/PosixAioFutureImpl$PosixReadWriteAioFuture"); NULL_CHECK(cls)
+	posixAioReadWriteFuture = (jclass)env->NewWeakGlobalRef(cls); NULL_CHECK(posixAioReadWriteFuture)
+	CID_posixAioReadWriteFuture = env->GetMethodID(posixAioReadWriteFuture, "<init>", "(Lorg/apache/aio/posix/PosixAsynchronousFileChannel;JLjava/nio/ByteBuffer;J)V"); NULL_CHECK(CID_posixAioReadWriteFuture)
+	FID_posixAioReadWriteFuture_buffer = env->GetFieldID(posixAioReadWriteFuture, "buffer", "Ljava/nio/ByteBuffer;"); NULL_CHECK(FID_posixAioReadWriteFuture_buffer)
+	
+	// --- Initialize signal handler
+	LOG_DEBUG("Setting up signal handler\n")
+	sigemptyset(&sig_act.sa_mask);
+	sig_act.sa_flags = SA_SIGINFO;
+	sig_act.sa_sigaction = aioSignalHandler;
+	int ret = sigaction(SIG_AIO, &sig_act, NULL);
+	if (ret != 0) {
+		env->ThrowNew(aioException, "error registering signal handler");
+	}
 	
 	return JNI_VERSION;
 } 
@@ -79,6 +442,7 @@
 	env->DeleteWeakGlobalRef(aioException);
 	env->DeleteWeakGlobalRef(ioException);
 	env->DeleteWeakGlobalRef(nullPointerException);
+	env->DeleteWeakGlobalRef(posixAioReadWriteFuture);
 }
 
 /**********************************************************************************************************************
@@ -95,7 +459,76 @@
 JNIEXPORT jobject JNICALL Java_org_apache_aio_posix_PosixAsynchronousFileChannel_read__Ljava_nio_ByteBuffer_2J
   (JNIEnv *env, jobject self, jobject buffer, jlong position)
 {
-	LOG_DEBUG("In read function");
+	LOG_DEBUG("In read function\n");
+
+	jclass bufferClass = env->GetObjectClass(buffer);
+	
+	// Get buffer limit and position
+	jint bufferPosition = getByteBufferPosition(env, buffer, bufferClass);
+
+	jint bufferSize = getByteBufferRemaining(env, buffer, bufferClass);
+	jint bufferSizeToAllocate = 0;
+
+	jbyte *address = (jbyte *)env->GetDirectBufferAddress(buffer);
+	if (address == NULL) {
+		LOG_DEBUG("Using heap ByteBuffer\n");
+		bufferSizeToAllocate = bufferSize;
+	} else {
+		LOG_DEBUG("Using direct ByteBuffer\n");
+		address += bufferPosition;
+	}
+	
+	struct aio_request *req = allocateRequest(1, bufferSizeToAllocate);
+	
+	// Setup aio_request fields
+	req->buffers[0].bufferSize = bufferSize;
+	req->buffers[0].allocatedBufferSize = bufferSizeToAllocate;
+	if (address == NULL) {
+		address = getBufferAddress(req, 0);
+	}
+	req->buffers[0].buffer = address;
+	
+	// Create future object
+	jobject future = createPosixReadWriteAioFuture(env, self, req, buffer, position);
+	if (future == NULL) {
+		// Error occured, return and let Java throw exception
+		return NULL;
+	}
+	req->future = env->NewGlobalRef(future);
+	if (req->future == NULL) {
+		// Error occured, return and let Java throw exception
+		return NULL;
+	}
+
+	// Set flag
+	req->flags = READ_FLAG;
+	
+	// Setup aiocb
+	req->cb.aio_fildes = getPosixChannelFD(env, self);
+	req->cb.aio_offset = position;
+	req->cb.aio_buf = address;
+	req->cb.aio_nbytes = bufferSize;
+
+	req->cb.aio_sigevent.sigev_value.sival_ptr = req;
+	if (env->GetBooleanField(self, FID_posixChannelThreadNotify)) {
+		// Do thread notification
+		req->cb.aio_sigevent.sigev_notify = SIGEV_THREAD;
+		req->cb.aio_sigevent.sigev_notify_function = aioThreadHandler;
+	} else {
+		// Do SIG notification
+		req->cb.aio_sigevent.sigev_notify = SIGEV_SIGNAL;
+		req->cb.aio_sigevent.sigev_signo = SIG_AIO;
+	}
+	
+	LOG_DEBUG("Calling aio_read\n");
+	int ret = aio_read(&req->cb);
+	if (ret) {
+		// TODO Handle each error return type as appropriate
+		perror("Error issuing aio_read");
+		cleanupPosixAio(env, req);
+		env->ThrowNew(aioException, "error issuing aio_read request");
+	}
+	return req->future;
 }
 
 /*
@@ -141,6 +574,29 @@
 {
 	LOG_DEBUG("In batch write function");
 	
+}
+
+/**********************************************************************************************************************
+ * 
+ * org.apache.aio.posix.PosixAioFutureImpl method
+ * 
+ *********************************************************************************************************************/
+
+/*
+ * Class:     org_apache_aio_posix_PosixAioFutureImpl
+ * Method:    cancel0
+ * Signature: ()Z
+ */
+JNIEXPORT jboolean JNICALL Java_org_apache_aio_posix_PosixAioFutureImpl_cancel0
+  (JNIEnv *env, jobject self)
+{
+	struct aio_request *request = (aio_request *)env->GetLongField(self, FID_posixAioFuture_aioRequestPtr);
+	int ret = aio_cancel(request->cb.aio_fildes, &request->cb);
+	if (ret == AIO_CANCELED) {
+		cleanupPosixAio(env, request);
+		return 1;
+	}
+	return 0;
 }
 
 /**********************************************************************************************************************

Modified: mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java?view=diff&rev=511365&r1=511364&r2=511365
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java (original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java Sat Feb 24 14:43:51 2007
@@ -19,6 +19,7 @@
  */
 package org.apache.aio;
 
+import java.nio.ByteBuffer;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
@@ -46,7 +47,10 @@
         int getLength();
     }
 
-    public static interface ByteBufferFuture extends AioFuture<Integer>, ByteBufferPosition {}
+    public static interface ByteBufferFuture extends AioFuture<Integer> {
+        public ByteBuffer getBuffer();
+        public long getPosition();
+    }
     
     /**
      * Associates a {@link AioCompletionHandler} with this future.  The completion handler will be invoked when

Modified: mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/ByteBufferPosition.java
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/ByteBufferPosition.java?view=diff&rev=511365&r1=511364&r2=511365
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/ByteBufferPosition.java (original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/ByteBufferPosition.java Sat Feb 24 14:43:51 2007
@@ -26,19 +26,31 @@
  * 
  * @author mheath
  */
-public interface ByteBufferPosition {
+public final class ByteBufferPosition {
 
+    private final ByteBuffer buffer;
+    private final long position;
+    
+    public ByteBufferPosition(ByteBuffer buffer, long position) {
+        this.buffer = buffer;
+        this.position = position;
+    }
+    
     /**
      * Fetches the buffer associated with this <tt>ByteBufferPosition</tt>.
      * 
      * @return  The buffer associated with this <tt>ByteBufferPosition</tt>.
      */
-    ByteBuffer getByteBuffer();
+    public final ByteBuffer getBuffer() {
+        return buffer;
+    }
 
     /**
      * 
      * @return
      */
-    long getPosition();
+    public final long getPosition() {
+        return position;
+    }
 
 }

Added: mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/AbstractAioFutureCallBackHandler.java
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/AbstractAioFutureCallBackHandler.java?view=auto&rev=511365
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/AbstractAioFutureCallBackHandler.java (added)
+++ mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/AbstractAioFutureCallBackHandler.java Sat Feb 24 14:43:51 2007
@@ -0,0 +1,45 @@
+package org.apache.aio.concurrent;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.aio.AioCompletionHandler;
+import org.apache.aio.AioFuture;
+
+public abstract class AbstractAioFutureCallBackHandler<V> implements AioFuture<V> {
+
+    private final List<AioCompletionHandler<V>> completionHandlers = new ArrayList<AioCompletionHandler<V>>();
+    private Object attachment;
+    
+    public AbstractAioFutureCallBackHandler() {
+        super();
+    }
+
+    public synchronized void addCompletionHandler(AioCompletionHandler<V> completionHandler) {
+        completionHandlers.add(completionHandler);
+    }
+
+    public synchronized boolean removeCompletionHandler(AioCompletionHandler<V> completionHandler) {
+        return completionHandlers.remove(completionHandler);
+    }
+
+    public synchronized void callCompletionHandlers() {
+        for (AioCompletionHandler<V> completionHandler : completionHandlers) {
+            try {
+                completionHandler.onCompletion(this);
+            } catch (Throwable e) {
+                // TODO Handle exception
+                e.printStackTrace();
+            }
+        }
+    }
+
+    public Object getAttachment() {
+        return attachment;
+    }
+
+    public void setAttachment(Object attachment) {
+        this.attachment = attachment;
+    }
+
+}
\ No newline at end of file

Modified: mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/AioFutureImpl.java
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/AioFutureImpl.java?view=diff&rev=511365&r1=511364&r2=511365
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/AioFutureImpl.java (original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/AioFutureImpl.java Sat Feb 24 14:43:51 2007
@@ -20,20 +20,17 @@
 package org.apache.aio.concurrent;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.aio.AioCallbackException;
-import org.apache.aio.AioCompletionHandler;
 import org.apache.aio.AioFuture;
 import org.apache.aio.AsynchronousFileChannel;
 import org.apache.aio.ByteBufferPosition;
 
-public class AioFutureImpl<V> implements AioFuture<V> {
+public class AioFutureImpl<V> extends AbstractAioFutureCallBackHandler<V> implements AioFuture<V> {
 
     public static class BatchFutureImpl extends AioFutureImpl<Long> implements BatchFuture {
 
@@ -80,16 +77,16 @@
     
     public static class ByteBufferFutureImpl extends AioFutureImpl<Integer> implements ByteBufferFuture {
 
-        private final ByteBuffer byteBuffer;
+        private final ByteBuffer buffer;
         private final long position;
         
-        public ByteBufferFutureImpl(ByteBuffer byteBuffer, long position) {
-            this.byteBuffer = byteBuffer;
+        public ByteBufferFutureImpl(ByteBuffer buffer, long position) {
+            this.buffer = buffer;
             this.position = position;
         }
         
-        public ByteBuffer getByteBuffer() {
-            return byteBuffer;
+        public ByteBuffer getBuffer() {
+            return buffer;
         }
 
         public long getPosition() {
@@ -98,23 +95,10 @@
         
     }
     
-//    public static class LockFutureImpl extends AbstractAioFuture<FileLock, LockFuture> implements LockFuture {
-//        
-//    }
-//    
-//    public static class OpenFutureImpl extends AbstractAioFuture<AsynchronousFileChannel, OpenFuture> implements OpenFuture {
-//        
-//    }
-//    
-//    public static class SyncFutureImpl
-//    
-//    public static class TruncateFutureImpl
-//    
     private Future<V> future;
     private Object attachment = null;
     private AsynchronousFileChannel asynchronousFileChannel = null;
-    private final List<AioCompletionHandler<V>> completionHandlers = new ArrayList<AioCompletionHandler<V>>();
-    private ExecutionException exception;
+    ExecutionException exception;
 
     public AioFutureImpl() {
         // Default constructor
@@ -122,24 +106,6 @@
     
     public AioFutureImpl(AsynchronousFileChannel channel) {
         this.asynchronousFileChannel = channel;
-    }
-
-    public synchronized void addCompletionHandler(AioCompletionHandler<V> completionHandler) {
-        completionHandlers.add(completionHandler);
-    }
-
-    public synchronized boolean removeCompletionHandler(AioCompletionHandler<V> completionHandler) {
-        return completionHandlers.remove(completionHandler);
-    }
-    
-    public synchronized void callCompletionHandlers() {
-        try {
-            for (AioCompletionHandler<V> completionHandler : completionHandlers) {
-                completionHandler.onCompletion(this);
-            }
-        } catch (Throwable e) {
-            exception = new ExecutionException(e);
-        }
     }
 
     public V get() throws InterruptedException, ExecutionException, AioCallbackException {

Modified: mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/ConcurrentAsynchronousFileChannel.java
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/ConcurrentAsynchronousFileChannel.java?view=diff&rev=511365&r1=511364&r2=511365
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/ConcurrentAsynchronousFileChannel.java (original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/ConcurrentAsynchronousFileChannel.java Sat Feb 24 14:43:51 2007
@@ -68,7 +68,7 @@
         Future<FileLock> future = executorService.submit(new Callable<FileLock>() {
            public FileLock call() throws Exception {
                 FileLock lock = channel.lock(position, size, shared);
-                callCompletionHandlers(aioFuture);
+                callCompletionHandlersInExecutorService(aioFuture);
                 return lock;
             }
 
@@ -86,7 +86,7 @@
         Future<Integer> future = executorService.submit(new Callable<Integer>() {
            public Integer call() throws Exception {
                int bytesRead = channel.read(buffer, position);
-               callCompletionHandlers(byteBufferFuture);
+               callCompletionHandlersInExecutorService(byteBufferFuture);
                return bytesRead;
             } 
         });
@@ -108,9 +108,9 @@
                long length = 0;
                for (int i = 0; i < length; i++) {
                    ByteBufferPosition byteBufferPosition = byteBufferPositions[i + offset];
-                   length += channel.read(byteBufferPosition.getByteBuffer(), byteBufferPosition.getPosition());
+                   length += channel.read(byteBufferPosition.getBuffer(), byteBufferPosition.getPosition());
                }
-               callCompletionHandlers(batchFuture);
+               callCompletionHandlersInExecutorService(batchFuture);
                return length;
             } 
         });
@@ -133,7 +133,7 @@
         Future<Void> future = executorService.submit(new Callable<Void>() {
             public Void call() throws Exception {
                 channel.force(metaData);
-                callCompletionHandlers(aioFuture);
+                callCompletionHandlersInExecutorService(aioFuture);
                 return null;
             } 
         });
@@ -150,7 +150,7 @@
         Future<Void> future = executorService.submit(new Callable<Void>() {
             public Void call() throws Exception {
                 channel.truncate(size);
-                callCompletionHandlers(aioFuture);
+                callCompletionHandlersInExecutorService(aioFuture);
                 return null;
             } 
         });
@@ -176,7 +176,7 @@
         Future<Integer> future = executorService.submit(new Callable<Integer>() {
            public Integer call() throws Exception {
                int bytesWritten = channel.write(buffer, position);
-               callCompletionHandlers(byteBufferFuture);
+               callCompletionHandlersInExecutorService(byteBufferFuture);
                return bytesWritten;
             } 
         });
@@ -198,9 +198,9 @@
                long length = 0;
                for (int i = 0; i < length; i++) {
                    ByteBufferPosition byteBufferPosition = byteBufferPositions[i + offset];
-                   length += channel.write(byteBufferPosition.getByteBuffer(), byteBufferPosition.getPosition());
+                   length += channel.write(byteBufferPosition.getBuffer(), byteBufferPosition.getPosition());
                }
-               callCompletionHandlers(batchFuture);
+               callCompletionHandlersInExecutorService(batchFuture);
                return length;
             } 
         });
@@ -212,7 +212,7 @@
         return channel.isOpen();
     }
 
-    private void callCompletionHandlers(final AioFutureImpl aioFuture) {
+    public void callCompletionHandlersInExecutorService(final AbstractAioFutureCallBackHandler aioFuture) {
         executorService.submit(new Runnable() {
             public void run() {
                 aioFuture.callCompletionHandlers();
@@ -220,4 +220,8 @@
         });
     } 
 
+    public ExecutorService getExecutorService() {
+        return executorService;
+    }
+    
 }

Modified: mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/ConcurrentAsynchronousFileChannelProvider.java
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/ConcurrentAsynchronousFileChannelProvider.java?view=diff&rev=511365&r1=511364&r2=511365
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/ConcurrentAsynchronousFileChannelProvider.java (original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/ConcurrentAsynchronousFileChannelProvider.java Sat Feb 24 14:43:51 2007
@@ -20,6 +20,7 @@
 package org.apache.aio.concurrent;
 
 import java.io.File;
+import java.io.FileDescriptor;
 import java.io.RandomAccessFile;
 import java.nio.channels.FileChannel;
 import java.util.Properties;
@@ -36,19 +37,17 @@
 
 public class ConcurrentAsynchronousFileChannelProvider implements AsynchronousFileChannelProvider {
 
-    public static final String PROPERTY_DEFAULT_THREAD_POOL_SIZE =
-            ConcurrentAsynchronousFileChannelProvider.class.getName() + ".defaultThreadPoolSize";
+    public static final String PROPERTY_DEFAULT_THREAD_POOL_SIZE = ConcurrentAsynchronousFileChannelProvider.class
+            .getName()
+            + ".defaultThreadPoolSize";
 
     private static final Integer DEFAULT_THREAD_POOL_SIZE = (Runtime.getRuntime().availableProcessors() + 1) * 2;
 
     private static ExecutorService staticExecutorService;
 
-    public AioFuture<AsynchronousFileChannel> open(
-            final File file,
-            final Modes mode,
-            final ExecutorService executorService,
-            final Properties properties) {
-        
+    public AioFuture<AsynchronousFileChannel> open(final File file, final Modes mode,
+            final ExecutorService executorService, final Properties properties) {
+
         // Determine which ExecutorService to use
         final ExecutorService service;
         if (executorService == null) {
@@ -56,31 +55,34 @@
         } else {
             service = executorService;
         }
-        
+
         final AioFutureImpl<AsynchronousFileChannel> aioFuture = new AioFutureImpl<AsynchronousFileChannel>();
-        
+
         Future<AsynchronousFileChannel> future = service.submit(new Callable<AsynchronousFileChannel>() {
             public AsynchronousFileChannel call() throws Exception {
-                final FileChannel channel = new RandomAccessFile(file, mode.getModeString()).getChannel();
-                AsynchronousFileChannel afc = createAsynchronouseFileChannel(channel, mode, service);
-                
+                final RandomAccessFile raf = new RandomAccessFile(file, mode.getModeString());
+                final FileChannel channel = raf.getChannel();
+                AsynchronousFileChannel afc = createAsynchronouseFileChannel(raf.getFD(), channel, mode, service,
+                        properties);
+
                 // Call completion handlers
                 service.submit(new Runnable() {
-                   public void run() {
-                       aioFuture.callCompletionHandlers();
-                    } 
+                    public void run() {
+                        aioFuture.callCompletionHandlers();
+                    }
                 });
-                
+
                 return afc;
             }
 
         });
         aioFuture.setFuture(future);
-        
+
         return aioFuture;
     }
 
-    protected AsynchronousFileChannel createAsynchronouseFileChannel(final FileChannel channel, final Modes mode, final ExecutorService service) {
+    protected AsynchronousFileChannel createAsynchronouseFileChannel(final FileDescriptor fd,
+            final FileChannel channel, final Modes mode, final ExecutorService service, final Properties properties) {
         return new ConcurrentAsynchronousFileChannel(channel, mode, service);
     }
 

Modified: mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/linux/LinuxAsynchronousFileChannelProvider.java
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/linux/LinuxAsynchronousFileChannelProvider.java?view=diff&rev=511365&r1=511364&r2=511365
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/linux/LinuxAsynchronousFileChannelProvider.java (original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/linux/LinuxAsynchronousFileChannelProvider.java Sat Feb 24 14:43:51 2007
@@ -19,7 +19,9 @@
  */
 package org.apache.aio.linux;
 
+import java.io.FileDescriptor;
 import java.nio.channels.FileChannel;
+import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.aio.AsynchronousFileChannel;
@@ -34,7 +36,8 @@
  */
 public class LinuxAsynchronousFileChannelProvider extends ConcurrentAsynchronousFileChannelProvider {
     @Override
-    protected AsynchronousFileChannel createAsynchronouseFileChannel(FileChannel channel, Modes mode, ExecutorService service) {
+    protected AsynchronousFileChannel createAsynchronouseFileChannel(FileDescriptor fd, FileChannel channel,
+            Modes mode, ExecutorService service, Properties properties) {
         return new LinuxAsynchronousFileChannel(channel, mode, service);
     }
 }

Added: mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixAioFutureImpl.java
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixAioFutureImpl.java?view=auto&rev=511365
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixAioFutureImpl.java (added)
+++ mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixAioFutureImpl.java Sat Feb 24 14:43:51 2007
@@ -0,0 +1,176 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.aio.posix;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.aio.AioCallbackException;
+import org.apache.aio.AsynchronousFileChannel;
+import org.apache.aio.concurrent.AbstractAioFutureCallBackHandler;
+
+public class PosixAioFutureImpl<V> extends AbstractAioFutureCallBackHandler<V> {
+
+    public static class PosixReadWriteAioFuture extends PosixAioFutureImpl<Integer> implements ByteBufferFuture {
+
+        private final ByteBuffer buffer;
+        private final long position;
+        
+        public PosixReadWriteAioFuture(PosixAsynchronousFileChannel channel, long aioRequestPtr, ByteBuffer buffer, long position) {
+            super(channel, aioRequestPtr);
+            this.buffer = buffer;
+            this.position = position;
+        }
+        
+        public ByteBuffer getBuffer() {
+            return buffer;
+        }
+
+        public long getPosition() {
+            return position;
+        }
+
+    }
+    
+    private final PosixAsynchronousFileChannel channel;
+    @SuppressWarnings("unused")
+    private final long aioRequestPtr; // The pointer to the aio request struct (used by native code)
+
+    private final Lock lock = new ReentrantLock();
+    private final Condition condition = lock.newCondition();
+    
+    private volatile Throwable exception;
+    private volatile boolean done = false;
+    private volatile V value = null;
+    
+    public PosixAioFutureImpl(PosixAsynchronousFileChannel channel, long aioRequestPtr) {
+        this.channel = channel;
+        this.aioRequestPtr = aioRequestPtr;
+    }
+    
+    public V get() throws InterruptedException, ExecutionException, AioCallbackException {
+        lock.lockInterruptibly();
+        try {
+            // TOOD - Come up with a better blocking mechanism than a spin lock that works with signals
+            while (!done) {
+                Thread.yield();
+                System.out.print(".");
+            }
+            System.out.println("Done with get!");
+            return getValue();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+        lock.lockInterruptibly();
+        try {
+            if (done) {
+                return getValue();
+            }
+            condition.await(timeout, unit);
+            return getValue();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    public AsynchronousFileChannel getChannel() {
+        return channel;
+    }
+
+    public boolean cancel(boolean mayInterruptIfRunning) {
+        if (done) {
+            return false;
+        }
+        boolean canceled = cancel0();
+        if (canceled) {
+            done = true;
+            exception = new CancellationException();
+            lock.lock();
+            try {
+                condition.signalAll();
+            } finally {
+                lock.unlock();
+            }
+        }
+        return canceled;
+    }
+
+    private native boolean cancel0();
+    
+    public boolean isCancelled() {
+        return done == true && exception instanceof CancellationException;
+    }
+
+    public boolean isDone() {
+        return done;
+    }
+
+    protected V getValue() throws ExecutionException {
+        if (exception != null) {
+            if (exception instanceof ExecutionException) {
+                throw (ExecutionException)exception;
+            }
+            throw new ExecutionException(exception);
+        }
+        return value;
+    }
+    
+    protected void setValue(V value) {
+        this.value = value;
+        done = true;
+        signal();
+    }
+    
+    protected void setValueInExecutorService(final V value) {
+        channel.getExecutorService().submit(new Runnable() {
+            public void run() {
+                setValue(value);
+            }
+        });
+    }
+    
+    public void setException(Throwable exception) {
+        this.exception = exception;
+        done = true;
+        signal();
+    }
+
+    private void signal() {
+        System.out.println("Signalling");
+        lock.lock();
+        try {
+            condition.signalAll();
+        } finally {
+            lock.unlock();
+        }
+        
+        callCompletionHandlers();
+    }
+    
+}

Modified: mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixAsynchronousFileChannel.java
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixAsynchronousFileChannel.java?view=diff&rev=511365&r1=511364&r2=511365
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixAsynchronousFileChannel.java (original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixAsynchronousFileChannel.java Sat Feb 24 14:43:51 2007
@@ -1,5 +1,6 @@
 package org.apache.aio.posix;
 
+import java.io.FileDescriptor;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.concurrent.ExecutorService;
@@ -14,8 +15,15 @@
 
 public class PosixAsynchronousFileChannel extends ConcurrentAsynchronousFileChannel {
 
-    public PosixAsynchronousFileChannel(FileChannel channel, Modes mode, ExecutorService executorService) {
+    @SuppressWarnings("unused") // This is used by native code
+    private final boolean threadNotify;
+    @SuppressWarnings("unused")
+    private final FileDescriptor fd;
+    
+    public PosixAsynchronousFileChannel(FileDescriptor fd, FileChannel channel, Modes mode, ExecutorService executorService, boolean threadNotify) {
         super(channel, mode, executorService);
+        this.fd = fd;
+        this.threadNotify = threadNotify;
     }
 
     @Override
@@ -33,4 +41,8 @@
     @Override
     public native BatchFuture write(ByteBufferPosition[] byteBufferPositions, int offset, int length);
     
+    public void test() {
+        System.out.println("This is from Java");
+        System.out.println("I love JNI");
+    }
 }

Modified: mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixAsynchronousFileChannelProvider.java
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixAsynchronousFileChannelProvider.java?view=diff&rev=511365&r1=511364&r2=511365
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixAsynchronousFileChannelProvider.java (original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixAsynchronousFileChannelProvider.java Sat Feb 24 14:43:51 2007
@@ -19,7 +19,9 @@
  */
 package org.apache.aio.posix;
 
+import java.io.FileDescriptor;
 import java.nio.channels.FileChannel;
+import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.aio.AsynchronousFileChannel;
@@ -28,9 +30,18 @@
 
 public class PosixAsynchronousFileChannelProvider extends ConcurrentAsynchronousFileChannelProvider {
 
+    public static final String PROPERTY_THREAD_NOTIFY = PosixAsynchronousFileChannel.class.getName() + ".threadNotify";
+
     @Override
-    protected AsynchronousFileChannel createAsynchronouseFileChannel(FileChannel channel, Modes mode, ExecutorService service) {
-        return new PosixAsynchronousFileChannel(channel, mode, service);
+    protected AsynchronousFileChannel createAsynchronouseFileChannel(FileDescriptor fd, FileChannel channel,
+            Modes mode, ExecutorService service, Properties properties) {
+        String systemDefaultThreadNotify = System.getProperty(PROPERTY_THREAD_NOTIFY, Boolean.FALSE.toString());
+        String threadNotify;
+        if (properties == null) {
+            threadNotify = systemDefaultThreadNotify;
+        } else {
+            threadNotify = properties.getProperty(PROPERTY_THREAD_NOTIFY, systemDefaultThreadNotify);
+        }
+        return new PosixAsynchronousFileChannel(fd, channel, mode, service, Boolean.valueOf(threadNotify));
     }
-
 }

Modified: mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/concurrent/TestReads.java
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/concurrent/TestReads.java?view=diff&rev=511365&r1=511364&r2=511365
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/concurrent/TestReads.java (original)
+++ mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/concurrent/TestReads.java Sat Feb 24 14:43:51 2007
@@ -56,7 +56,7 @@
         readFuture.addCompletionHandler(new AioCompletionHandler<Integer>() {
             public void onCompletion(AioFuture<Integer> future) {
                 ByteBufferFuture byteBufferFuture = (ByteBufferFuture)future;
-                ByteBuffer buffer = byteBufferFuture.getByteBuffer();
+                ByteBuffer buffer = byteBufferFuture.getBuffer();
                 assert buffer.position() == buffer.limit();
                 
                 buffer.flip();

Modified: mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/posix/TestPosixFileChannel.java
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/posix/TestPosixFileChannel.java?view=diff&rev=511365&r1=511364&r2=511365
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/posix/TestPosixFileChannel.java (original)
+++ mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/posix/TestPosixFileChannel.java Sat Feb 24 14:43:51 2007
@@ -1,6 +1,11 @@
 package org.apache.aio.posix;
 
-import org.apache.aio.linux.LinuxAsynchronousFileChannel;
+import java.io.FileInputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.Executors;
+
+import org.apache.aio.AioFuture;
+import org.apache.aio.Modes;
 
 public class TestPosixFileChannel {
 
@@ -8,13 +13,21 @@
         System.loadLibrary("aioj");
     }
     
-    public static void main(String[] args) {
-        PosixAsynchronousFileChannel channel = new PosixAsynchronousFileChannel(null, null, null);
-        channel.read(null, 0L);
-        
-        LinuxAsynchronousFileChannel linuxChannel = new LinuxAsynchronousFileChannel(null, null, null);
-        linuxChannel.read(null, 0L);
-        
+    public static void main(String[] args) throws Exception {
+        FileInputStream in = new FileInputStream("/etc/passwd");
+        PosixAsynchronousFileChannel channel = new PosixAsynchronousFileChannel(
+                in.getFD(),
+                in.getChannel(),
+                Modes.READ_ONLY,
+                Executors.newCachedThreadPool(), true);
+        ByteBuffer buffer = ByteBuffer.allocateDirect(1000);
+        AioFuture<Integer> future = channel.read(buffer, 0L);
+        future.get();
+        buffer.flip();
+        byte[] bytes = new byte[buffer.remaining()];
+        buffer.get(bytes);
+        System.out.println(new String(bytes));
+        System.out.println("Done!");
     }
 
 }

Modified: mina/sandbox/mheath/aioj/trunk/todo.txt
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/todo.txt?view=diff&rev=511365&r1=511364&r2=511365
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/todo.txt (original)
+++ mina/sandbox/mheath/aioj/trunk/todo.txt Sat Feb 24 14:43:51 2007
@@ -1,15 +1,12 @@
 - Add support for heap buffers
-- Throw IO Exceptions when aio calls return an error
+- Throw AIOExceptions when aio calls return an error
 - Figure out batch requests
 - Add support for timeouts -- does AIO itself have support for timeouts?
 - Add support for receiving file modification events
 - Make sure we're checking for errors on all JNI method calls
-- Modify AIOFuture to extend Java 5 Future
-- Look into simplifying AioFuture using generics
-- Update ByteBuffer immediately
 - Add support for aio_fsync
-- Provide an asynchronous open
 - Add support for using Groovy Closures for observers
+- Add support for prefetching ByteBuffer method id's
 
 === Testing ===
 - Make sure the AsynchronousFileChannel can be unloaded without holding onto native references