You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by mh...@apache.org on 2006/11/27 21:47:38 UTC

svn commit: r479760 - in /mina/sandbox/mheath/aioj/trunk: ./ src/main/c/ src/main/java/org/apache/aio/ src/main/java/org/apache/aio/common/ src/test/java/

Author: mheath
Date: Mon Nov 27 12:47:37 2006
New Revision: 479760

URL: http://svn.apache.org/viewvc?view=rev&rev=479760
Log:
Add error handling callbacks.

Modified:
    mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp
    mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java
    mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFutureBatch.java
    mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/common/AbstractAioFuture.java
    mina/sandbox/mheath/aioj/trunk/src/test/java/AIOTest.java
    mina/sandbox/mheath/aioj/trunk/todo.txt

Modified: mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp?view=diff&rev=479760&r1=479759&r2=479760
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp (original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp Mon Nov 27 12:47:37 2006
@@ -6,8 +6,19 @@
 #include "org_apache_aio_AsynchronousFileChannel.h"
 #include "org_apache_aio_posix_PosixAioFutureReadWrite.h"
 
-#define DEBUG 1
+#define DEBUG
 
+#ifdef DEBUG
+#define LOG_DEBUG(s) { fprintf(stdout, s); fflush(stdout); }
+#endif
+
+#ifndef DEBUG
+#define LOG_DEBUG(s)
+#endif
+
+// --- jvm handler ---
+
+// --- Exception classes ---
 static JavaVM *jvm;
 static jclass ioException;
 static jclass nullPointerException;
@@ -15,9 +26,10 @@
 // --- Classes and IDs for PosixAioFutureReadWrite ---
 static jclass posixAioFutureReadWrite;
 static jmethodID CID_posixAioFutureReadWrite;
-static jmethodID posixAioFutureReadWrite_processFutureListenersID;
 static jfieldID posixAioFutureReadWrite_aiocbPtrID;
 static jfieldID posixAioFutureReadWrite_bufferID;
+static jmethodID abstractAioFuture_processFutureListenersID;
+static jmethodID abstractAioFuture_handleErrorID;
 
 static jfieldID fdID; // ID for java.io.FileDescriptor.fd
 static jfieldID fieldDescID; // ID for org.apache.aio.AsynchronousFileChannel.fd
@@ -85,14 +97,12 @@
 struct aiocb *setupAioRequest(JNIEnv *env, jobject asynchronousFileChannel, jobject buffer, jlong position, jobject operation)
 {
 		// Get address and capacity of buffer
-	if (buffer == NULL)
-	{
+	if (buffer == NULL) {
 		env->ThrowNew(nullPointerException, "buffer cannot be null");
 	}
 	// TODO: Add support for non direct byte buffers
 	jbyte *bufferAddress = (jbyte *)env->GetDirectBufferAddress(buffer);
-	if (bufferAddress == NULL)
-	{
+	if (bufferAddress == NULL) {
 		env->ThrowNew(ioException, "Must use direct ByteBuffer");
 		return NULL;
 	}
@@ -114,8 +124,7 @@
 	req->aio_nbytes = bufferSize;
 	
 	req->aio_sigevent.sigev_notify = SIGEV_THREAD;
-	if (operation == operationRead)
-	{
+	if (operation == operationRead) {
 		req->aio_sigevent.sigev_notify_function = aio_read_completion_handler;
 	} else if (operation == operationWrite) {
 		req->aio_sigevent.sigev_notify_function = aio_write_completion_handler;
@@ -130,18 +139,16 @@
 JNIEXPORT jint JNICALL JNI_OnLoad(JavaVM *vm, void *reserved)
 {
 	jint JNIversion = JNI_VERSION_1_4;
-	if (DEBUG)
-	{
-		fprintf(stdout, "Initializing native code for JNI version 0x%x\n", JNIversion);
-		fflush(stdout);
-	}
+#ifdef DEBUG
+	fprintf(stdout, "Initializing native code for JNI version 0x%x\n", JNIversion);
+	fflush(stdout);
+#endif
 	
 	// Initialize static jvm pointer.
 	jvm = vm;
 	
 	JNIEnv *env;
-	if (vm->GetEnv((void**)&env, JNIversion))
-	{
+	if (vm->GetEnv((void**)&env, JNIversion)) {
 		return JNI_ERR;
 	}
 	
@@ -155,10 +162,13 @@
 	cls = env->FindClass("org/apache/aio/posix/PosixAioFutureReadWrite");
 	posixAioFutureReadWrite = (jclass)env->NewWeakGlobalRef(cls);
 	CID_posixAioFutureReadWrite = env->GetMethodID(posixAioFutureReadWrite, "<init>", "(Lorg/apache/aio/AsynchronousFileChannel;Lorg/apache/aio/Operation;JLjava/nio/ByteBuffer;J)V");
-	posixAioFutureReadWrite_processFutureListenersID = env->GetMethodID(posixAioFutureReadWrite, "processFutureListeners", "()V");
 	posixAioFutureReadWrite_aiocbPtrID = env->GetFieldID(posixAioFutureReadWrite, "aiocbPtr", "J"); 
 	posixAioFutureReadWrite_bufferID = env->GetFieldID(posixAioFutureReadWrite, "buffer", "Ljava/nio/ByteBuffer;");
 	
+	cls = env->FindClass("org/apache/aio/common/AbstractAioFuture");
+	abstractAioFuture_processFutureListenersID = env->GetMethodID(cls, "processFutureListeners", "()V");
+	abstractAioFuture_handleErrorID = env->GetMethodID(cls, "handleError", "(Ljava/lang/Throwable;)V");
+	
 	cls = env->FindClass("java/io/FileDescriptor");
 	fdID = env->GetFieldID(cls, "fd", "I");
 	
@@ -183,8 +193,7 @@
 {
 	jint JNIversion = JNI_VERSION_1_4;
 	JNIEnv *env;
-	if (vm->GetEnv((void**)&env, JNIversion))
-	{
+	if (vm->GetEnv((void**)&env, JNIversion)) {
 		return;
 	}
 	
@@ -201,15 +210,13 @@
 JNIEXPORT jobject JNICALL Java_org_apache_aio_AsynchronousFileChannel_write
   (JNIEnv *env, jobject obj, jobject buffer, jlong position)
 {
-	if (DEBUG)
-	{
-		fprintf(stdout, "aio write at file position %d\n", position);
-		fflush(stdout);
-	}
+#ifdef DEBUG
+	fprintf(stdout, "aio write at file position %d\n", position);
+	fflush(stdout);
+#endif
 	struct aiocb *req = setupAioRequest(env, obj, buffer, position, operationWrite);
 	int ret = aio_write(req);
-	if (ret)
-	{
+	if (ret) {
 		// TODO Handle errors from ret
 		// return a null on error.
 		return NULL;
@@ -220,20 +227,14 @@
 JNIEXPORT jobject JNICALL Java_org_apache_aio_AsynchronousFileChannel_read
   (JNIEnv *env, jobject obj, jobject buffer, jlong position)
 {
-	if (DEBUG)
-	{
-		fprintf(stdout, "aio read at file position %d\n", position);
-		fflush(stdout);
-	}
+#ifdef DEBUG
+	fprintf(stdout, "aio read at file position %d\n", position);
+	fflush(stdout);
+#endif
 	struct aiocb *req = setupAioRequest(env, obj, buffer, position, operationRead);
-	if (DEBUG)
-	{
-		fprintf(stdout, "Do aio read\n");
-		fflush(stdout);
-	}
+	LOG_DEBUG("Do aio read\n");
 	int ret = aio_read(req);
-	if (ret)
-	{
+	if (ret) {
 		// TODO Handle errors from ret
 		// return a null on error.
 		return NULL;
@@ -246,21 +247,21 @@
 JNIEXPORT jobject JNICALL Java_org_apache_aio_AsynchronousFileChannel_batchRead
   (JNIEnv *env, jobject obj, jobjectArray batch)
 {
-	printf("batchRead\n");
+	LOG_DEBUG("batchRead\n")
 	return NULL;
 }
 
 JNIEXPORT jobject JNICALL Java_org_apache_aio_AsynchronousFileChannel_batchWrite
   (JNIEnv *env, jobject obj, jobjectArray batch)
 {
-	printf("batchWrite\n");
+	LOG_DEBUG("batchWrite\n");
 	return NULL;
 }
 
 JNIEXPORT void JNICALL Java_org_apache_aio_AsynchronousFileChannel_suspend
   (JNIEnv *env, jobject obj, jobjectArray futures)
 {
-	printf("suspend\n");
+	LOG_DEBUG("suspend\n");
 }
 
 // --- PosixAioFutureReadWrite methods ---------------------------------------------------
@@ -269,8 +270,7 @@
 {
 	struct aiocb *req = (struct aiocb *)env->GetLongField(obj, posixAioFutureReadWrite_aiocbPtrID);
 	int ret = aio_cancel(req->aio_fildes, req);
-	if (ret == AIO_CANCELED)
-	{
+	if (ret == AIO_CANCELED) {
 		free(req);
 		return 1;
 	} 
@@ -278,19 +278,16 @@
 }
 
 // --- Completion handlers ----------------------------------------------------------------
-void aio_read_completion_handler(sigval_t sigval)
-{
-	if (DEBUG)
-	{
-		fprintf(stdout, "In read completion handler\n");
-		fflush(stdout);
-	}
+// TODO: Consolodate completion handlers
+enum operation_mode {READ, WRITE};
 
+void aio_read_write_completion_handler(sigval_t sigval, operation_mode mode, char *jvmAttachErrorMessage, char *failureMessage)
+{
 	JNIEnv *env;
 	jint res = jvm->AttachCurrentThread((void**)&env, NULL);
-	if (res < 0)
-	{
-		fprintf(stderr, "Failed to attach JVM to AIO read thread\n");
+	if (res < 0) {
+		fprintf(stderr, jvmAttachErrorMessage);
+		fflush(stderr);
 		return;
 	}
 	
@@ -302,68 +299,45 @@
 		/* Request completed successfully, get number of bytes processed */
 		int ret = aio_return( req );
 		
-		// Adjust buffer limit
-		jobject buffer = env->GetObjectField(future, posixAioFutureReadWrite_bufferID);
-		int limit = ret + getBufferPosition(env, buffer);
-		setBufferLimit(env, buffer, limit);
+		if (mode == READ) {
+			// Adjust buffer limit
+			jobject buffer = env->GetObjectField(future, posixAioFutureReadWrite_bufferID);
+			int limit = ret + getBufferPosition(env, buffer);
+			setBufferLimit(env, buffer, limit);
+		} else if (mode == WRITE) {
+			// Adjust buffer position
+			jobject buffer = env->GetObjectField(future, posixAioFutureReadWrite_bufferID);
+			int position = getBufferPosition(env, buffer) + ret;
+			setBufferPosition(env, buffer, position);
+		}
 		
 		// Call aio listeners
-		env->CallVoidMethod(future, posixAioFutureReadWrite_processFutureListenersID);
-		
-		// Free resources		
-		free(req);
-		env->DeleteGlobalRef(future);
+		env->CallVoidMethod(future, abstractAioFuture_processFutureListenersID);
 	} else {
-		// TODO Find a way to handle exception here
-		fprintf(stderr, "ERROR: AIO read request did NOT complete\n");
-		fflush(stderr);
+		// TODO Write a unit test for this
+		env->ThrowNew(ioException, failureMessage);
+		jthrowable e = env->ExceptionOccurred();
+		env->CallVoidMethod(future, abstractAioFuture_handleErrorID, e);
 	}
+		
+	// Free resources		
+	free(req);
+	env->DeleteGlobalRef(future);
 
 	jvm->DetachCurrentThread();
 	return;
+} 
+
+void aio_read_completion_handler(sigval_t sigval)
+{
+	LOG_DEBUG("In read completion handler\n")
+	
+	aio_read_write_completion_handler(sigval, READ, "Failed to attach JVM to AIO read thread\n", "Could not complete read request");
 }
 
 void aio_write_completion_handler(sigval_t sigval)
 {
-	if (DEBUG)
-	{
-		fprintf(stdout, "In write completion handler\n");
-		fflush(stdout);
-	}
+	LOG_DEBUG("In write completion handler\n")
 
-	JNIEnv *env;
-	jint res = jvm->AttachCurrentThread((void**)&env, NULL);
-	if (res < 0)
-	{
-		fprintf(stderr, "Failed to attach JVM to AIO write thread\n");
-		return;
-	}
-	
-	jobject future = (jobject)sigval.sival_ptr;
-	struct aiocb *req = (struct aiocb *)env->GetLongField(future, posixAioFutureReadWrite_aiocbPtrID);
-	
-	/* Did the request complete? */
-	if (aio_error(req) == 0) {
-		/* Request completed successfully, get number of bytes processed */
-		int ret = aio_return( req );
-		
-		// Adjust buffer position
-		jobject buffer = env->GetObjectField(future, posixAioFutureReadWrite_bufferID);
-		int position = getBufferPosition(env, buffer) + ret;
-		setBufferPosition(env, buffer, position);
-		
-		// Call aio listeners
-		env->CallVoidMethod(future, posixAioFutureReadWrite_processFutureListenersID);
-		
-		// Free resources		
-		free(req);
-		env->DeleteGlobalRef(future);
-	} else {
-		// TODO Find a way to handle exception here
-		fprintf(stderr, "ERROR: AIO write request did NOT complete\n");
-		fflush(stderr);
-	}
-
-	jvm->DetachCurrentThread();
-	return;
+	aio_read_write_completion_handler(sigval, READ, "Failed to attach JVM to AIO write thread\n", "Could not complete write request");
 }

Modified: mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java?view=diff&rev=479760&r1=479759&r2=479760
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java (original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java Mon Nov 27 12:47:37 2006
@@ -8,9 +8,17 @@
 	
 	public void removeListener(AioFutureListener<T> ioFutureListener);
 	
+	public void addErrorListener(AioFutureListener<T> ioFutureListener);
+	
+	public void removeErrorListener(AioFutureListener<T> ioFutureListener);
+	
 	public AsynchronousFileChannel getChannel();
 	
 	public boolean isCompleted();
+
+	public boolean isSuccessful();
+	
+	public Throwable getException();
 	
 	public void join() throws InterruptedException;
 	

Modified: mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFutureBatch.java
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFutureBatch.java?view=diff&rev=479760&r1=479759&r2=479760
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFutureBatch.java (original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFutureBatch.java Mon Nov 27 12:47:37 2006
@@ -1,10 +1,7 @@
 package org.apache.aio;
 
-public interface AioFutureBatch extends AioFuture {
-	
-	public void addListener(AioFutureListener<AioFutureBatch> ioFutureListener);
-	
-	public void removeListener(AioFutureListener<AioFutureBatch> ioFutureListener);
-	
+public interface AioFutureBatch extends AioFuture<AioFutureBatch> {
+
 	public BatchRequest[] getRequests();
+
 }

Modified: mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/common/AbstractAioFuture.java
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/common/AbstractAioFuture.java?view=diff&rev=479760&r1=479759&r2=479760
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/common/AbstractAioFuture.java (original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/common/AbstractAioFuture.java Mon Nov 27 12:47:37 2006
@@ -14,7 +14,9 @@
 	private final AsynchronousFileChannel channel;
 	private final Operation operation;
 	private final List<AioFutureListener<T>> listeners = new LinkedList<AioFutureListener<T>>();
+	private final List<AioFutureListener<T>> errorListeners = new LinkedList<AioFutureListener<T>>();
 	private volatile boolean completed = false;
+	private volatile Throwable exception = null;
 
 	protected AbstractAioFuture(AsynchronousFileChannel channel, Operation operation) {
 		this.channel = channel;
@@ -30,6 +32,23 @@
 		}
 	}
 
+	public synchronized void removeListener(AioFutureListener<T> ioFutureListener) {
+		listeners.remove(ioFutureListener);
+	}
+
+	@SuppressWarnings("unchecked")
+	public synchronized void addErrorListener(AioFutureListener<T> ioFutureListener) {
+		if (exception != null) {
+			ioFutureListener.onCompletion((T)this);
+		} else {
+			errorListeners.add(ioFutureListener);
+		}
+	}
+
+	public synchronized void removeErrorListener(AioFutureListener<T> ioFutureListener) {
+		errorListeners.remove(ioFutureListener);
+	}
+
 	public AsynchronousFileChannel getChannel() {
 		return channel;
 	}
@@ -41,6 +60,14 @@
 	public boolean isCompleted() {
 		return completed;
 	}
+	
+	public boolean isSuccessful() {
+		return exception == null;
+	}
+	
+	public Throwable getException() {
+		return exception;
+	}
 
 	public synchronized void join() throws InterruptedException {
 		if (!completed) {
@@ -55,10 +82,6 @@
 		return isCompleted();
 	}
 
-	public synchronized void removeListener(AioFutureListener<T> ioFutureListener) {
-		listeners.remove(ioFutureListener);
-	}
-
 	@SuppressWarnings("unchecked")
 	protected synchronized void processFutureListeners() {
 		completed = true;
@@ -68,4 +91,14 @@
 		}
 	}
 
+	@SuppressWarnings("unchecked")
+	protected synchronized void handleError(Throwable exception) {
+		this.exception = exception;
+		completed = true;
+		notifyAll();
+		for (AioFutureListener<T> listener : errorListeners) {
+			listener.onCompletion((T)this);
+		}
+	}
+	
 }

Modified: mina/sandbox/mheath/aioj/trunk/src/test/java/AIOTest.java
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/test/java/AIOTest.java?view=diff&rev=479760&r1=479759&r2=479760
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/test/java/AIOTest.java (original)
+++ mina/sandbox/mheath/aioj/trunk/src/test/java/AIOTest.java Mon Nov 27 12:47:37 2006
@@ -30,9 +30,12 @@
 		achannel = new AsynchronousFileChannel(out.getFD());
 		buffer.clear();
 		buffer.put("Have a really nice day!\n".getBytes());
+		System.out.println("Position: " + buffer.position());
 		buffer.flip();
+		System.out.println("Position: " + buffer.position());
 		future = achannel.write(buffer, 0);
-		System.out.println(future);
+		future.join();
+		System.out.println("Position: " + buffer.position());
 	}
 		
 }

Modified: mina/sandbox/mheath/aioj/trunk/todo.txt
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/todo.txt?view=diff&rev=479760&r1=479759&r2=479760
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/todo.txt (original)
+++ mina/sandbox/mheath/aioj/trunk/todo.txt Mon Nov 27 12:47:37 2006
@@ -1,7 +1,12 @@
 - Add support for heap buffers
 - Throw IO Exceptions when aio calls return an error
 - Figure out batch requests
+- Add support for timeouts and error handling
+- Add support for receiving file modification events
+- Make sure we're checking for errors on all JNI method calls
 
 === Testing ===
 - Make sure the AsynchronousFileChannel can be unloaded without holding onto native references
-- Test for memory leaks
\ No newline at end of file
+- Test for memory leaks
+- Do performance tests
+- Write unit tests
\ No newline at end of file