You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by mh...@apache.org on 2006/11/27 21:47:38 UTC
svn commit: r479760 - in /mina/sandbox/mheath/aioj/trunk: ./ src/main/c/
src/main/java/org/apache/aio/ src/main/java/org/apache/aio/common/
src/test/java/
Author: mheath
Date: Mon Nov 27 12:47:37 2006
New Revision: 479760
URL: http://svn.apache.org/viewvc?view=rev&rev=479760
Log:
Add error handling callbacks.
Modified:
mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFutureBatch.java
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/common/AbstractAioFuture.java
mina/sandbox/mheath/aioj/trunk/src/test/java/AIOTest.java
mina/sandbox/mheath/aioj/trunk/todo.txt
Modified: mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp?view=diff&rev=479760&r1=479759&r2=479760
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp (original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp Mon Nov 27 12:47:37 2006
@@ -6,8 +6,19 @@
#include "org_apache_aio_AsynchronousFileChannel.h"
#include "org_apache_aio_posix_PosixAioFutureReadWrite.h"
-#define DEBUG 1
+#define DEBUG
+#ifdef DEBUG
+#define LOG_DEBUG(s) { fprintf(stdout, s); fflush(stdout); }
+#endif
+
+#ifndef DEBUG
+#define LOG_DEBUG(s)
+#endif
+
+// --- jvm handler ---
+
+// --- Exception classes ---
static JavaVM *jvm;
static jclass ioException;
static jclass nullPointerException;
@@ -15,9 +26,10 @@
// --- Classes and IDs for PosixAioFutureReadWrite ---
static jclass posixAioFutureReadWrite;
static jmethodID CID_posixAioFutureReadWrite;
-static jmethodID posixAioFutureReadWrite_processFutureListenersID;
static jfieldID posixAioFutureReadWrite_aiocbPtrID;
static jfieldID posixAioFutureReadWrite_bufferID;
+static jmethodID abstractAioFuture_processFutureListenersID;
+static jmethodID abstractAioFuture_handleErrorID;
static jfieldID fdID; // ID for java.io.FileDescriptor.fd
static jfieldID fieldDescID; // ID for org.apache.aio.AsynchronousFileChannel.fd
@@ -85,14 +97,12 @@
struct aiocb *setupAioRequest(JNIEnv *env, jobject asynchronousFileChannel, jobject buffer, jlong position, jobject operation)
{
// Get address and capacity of buffer
- if (buffer == NULL)
- {
+ if (buffer == NULL) {
env->ThrowNew(nullPointerException, "buffer cannot be null");
}
// TODO: Add support for non direct byte buffers
jbyte *bufferAddress = (jbyte *)env->GetDirectBufferAddress(buffer);
- if (bufferAddress == NULL)
- {
+ if (bufferAddress == NULL) {
env->ThrowNew(ioException, "Must use direct ByteBuffer");
return NULL;
}
@@ -114,8 +124,7 @@
req->aio_nbytes = bufferSize;
req->aio_sigevent.sigev_notify = SIGEV_THREAD;
- if (operation == operationRead)
- {
+ if (operation == operationRead) {
req->aio_sigevent.sigev_notify_function = aio_read_completion_handler;
} else if (operation == operationWrite) {
req->aio_sigevent.sigev_notify_function = aio_write_completion_handler;
@@ -130,18 +139,16 @@
JNIEXPORT jint JNICALL JNI_OnLoad(JavaVM *vm, void *reserved)
{
jint JNIversion = JNI_VERSION_1_4;
- if (DEBUG)
- {
- fprintf(stdout, "Initializing native code for JNI version 0x%x\n", JNIversion);
- fflush(stdout);
- }
+#ifdef DEBUG
+ fprintf(stdout, "Initializing native code for JNI version 0x%x\n", JNIversion);
+ fflush(stdout);
+#endif
// Initialize static jvm pointer.
jvm = vm;
JNIEnv *env;
- if (vm->GetEnv((void**)&env, JNIversion))
- {
+ if (vm->GetEnv((void**)&env, JNIversion)) {
return JNI_ERR;
}
@@ -155,10 +162,13 @@
cls = env->FindClass("org/apache/aio/posix/PosixAioFutureReadWrite");
posixAioFutureReadWrite = (jclass)env->NewWeakGlobalRef(cls);
CID_posixAioFutureReadWrite = env->GetMethodID(posixAioFutureReadWrite, "<init>", "(Lorg/apache/aio/AsynchronousFileChannel;Lorg/apache/aio/Operation;JLjava/nio/ByteBuffer;J)V");
- posixAioFutureReadWrite_processFutureListenersID = env->GetMethodID(posixAioFutureReadWrite, "processFutureListeners", "()V");
posixAioFutureReadWrite_aiocbPtrID = env->GetFieldID(posixAioFutureReadWrite, "aiocbPtr", "J");
posixAioFutureReadWrite_bufferID = env->GetFieldID(posixAioFutureReadWrite, "buffer", "Ljava/nio/ByteBuffer;");
+ cls = env->FindClass("org/apache/aio/common/AbstractAioFuture");
+ abstractAioFuture_processFutureListenersID = env->GetMethodID(cls, "processFutureListeners", "()V");
+ abstractAioFuture_handleErrorID = env->GetMethodID(cls, "handleError", "(Ljava/lang/Throwable;)V");
+
cls = env->FindClass("java/io/FileDescriptor");
fdID = env->GetFieldID(cls, "fd", "I");
@@ -183,8 +193,7 @@
{
jint JNIversion = JNI_VERSION_1_4;
JNIEnv *env;
- if (vm->GetEnv((void**)&env, JNIversion))
- {
+ if (vm->GetEnv((void**)&env, JNIversion)) {
return;
}
@@ -201,15 +210,13 @@
JNIEXPORT jobject JNICALL Java_org_apache_aio_AsynchronousFileChannel_write
(JNIEnv *env, jobject obj, jobject buffer, jlong position)
{
- if (DEBUG)
- {
- fprintf(stdout, "aio write at file position %d\n", position);
- fflush(stdout);
- }
+#ifdef DEBUG
+ fprintf(stdout, "aio write at file position %d\n", position);
+ fflush(stdout);
+#endif
struct aiocb *req = setupAioRequest(env, obj, buffer, position, operationWrite);
int ret = aio_write(req);
- if (ret)
- {
+ if (ret) {
// TODO Handle errors from ret
// return a null on error.
return NULL;
@@ -220,20 +227,14 @@
JNIEXPORT jobject JNICALL Java_org_apache_aio_AsynchronousFileChannel_read
(JNIEnv *env, jobject obj, jobject buffer, jlong position)
{
- if (DEBUG)
- {
- fprintf(stdout, "aio read at file position %d\n", position);
- fflush(stdout);
- }
+#ifdef DEBUG
+ fprintf(stdout, "aio read at file position %d\n", position);
+ fflush(stdout);
+#endif
struct aiocb *req = setupAioRequest(env, obj, buffer, position, operationRead);
- if (DEBUG)
- {
- fprintf(stdout, "Do aio read\n");
- fflush(stdout);
- }
+ LOG_DEBUG("Do aio read\n");
int ret = aio_read(req);
- if (ret)
- {
+ if (ret) {
// TODO Handle errors from ret
// return a null on error.
return NULL;
@@ -246,21 +247,21 @@
JNIEXPORT jobject JNICALL Java_org_apache_aio_AsynchronousFileChannel_batchRead
(JNIEnv *env, jobject obj, jobjectArray batch)
{
- printf("batchRead\n");
+ LOG_DEBUG("batchRead\n")
return NULL;
}
JNIEXPORT jobject JNICALL Java_org_apache_aio_AsynchronousFileChannel_batchWrite
(JNIEnv *env, jobject obj, jobjectArray batch)
{
- printf("batchWrite\n");
+ LOG_DEBUG("batchWrite\n");
return NULL;
}
JNIEXPORT void JNICALL Java_org_apache_aio_AsynchronousFileChannel_suspend
(JNIEnv *env, jobject obj, jobjectArray futures)
{
- printf("suspend\n");
+ LOG_DEBUG("suspend\n");
}
// --- PosixAioFutureReadWrite methods ---------------------------------------------------
@@ -269,8 +270,7 @@
{
struct aiocb *req = (struct aiocb *)env->GetLongField(obj, posixAioFutureReadWrite_aiocbPtrID);
int ret = aio_cancel(req->aio_fildes, req);
- if (ret == AIO_CANCELED)
- {
+ if (ret == AIO_CANCELED) {
free(req);
return 1;
}
@@ -278,19 +278,16 @@
}
// --- Completion handlers ----------------------------------------------------------------
-void aio_read_completion_handler(sigval_t sigval)
-{
- if (DEBUG)
- {
- fprintf(stdout, "In read completion handler\n");
- fflush(stdout);
- }
+// TODO: Consolodate completion handlers
+enum operation_mode {READ, WRITE};
+void aio_read_write_completion_handler(sigval_t sigval, operation_mode mode, char *jvmAttachErrorMessage, char *failureMessage)
+{
JNIEnv *env;
jint res = jvm->AttachCurrentThread((void**)&env, NULL);
- if (res < 0)
- {
- fprintf(stderr, "Failed to attach JVM to AIO read thread\n");
+ if (res < 0) {
+ fprintf(stderr, jvmAttachErrorMessage);
+ fflush(stderr);
return;
}
@@ -302,68 +299,45 @@
/* Request completed successfully, get number of bytes processed */
int ret = aio_return( req );
- // Adjust buffer limit
- jobject buffer = env->GetObjectField(future, posixAioFutureReadWrite_bufferID);
- int limit = ret + getBufferPosition(env, buffer);
- setBufferLimit(env, buffer, limit);
+ if (mode == READ) {
+ // Adjust buffer limit
+ jobject buffer = env->GetObjectField(future, posixAioFutureReadWrite_bufferID);
+ int limit = ret + getBufferPosition(env, buffer);
+ setBufferLimit(env, buffer, limit);
+ } else if (mode == WRITE) {
+ // Adjust buffer position
+ jobject buffer = env->GetObjectField(future, posixAioFutureReadWrite_bufferID);
+ int position = getBufferPosition(env, buffer) + ret;
+ setBufferPosition(env, buffer, position);
+ }
// Call aio listeners
- env->CallVoidMethod(future, posixAioFutureReadWrite_processFutureListenersID);
-
- // Free resources
- free(req);
- env->DeleteGlobalRef(future);
+ env->CallVoidMethod(future, abstractAioFuture_processFutureListenersID);
} else {
- // TODO Find a way to handle exception here
- fprintf(stderr, "ERROR: AIO read request did NOT complete\n");
- fflush(stderr);
+ // TODO Write a unit test for this
+ env->ThrowNew(ioException, failureMessage);
+ jthrowable e = env->ExceptionOccurred();
+ env->CallVoidMethod(future, abstractAioFuture_handleErrorID, e);
}
+
+ // Free resources
+ free(req);
+ env->DeleteGlobalRef(future);
jvm->DetachCurrentThread();
return;
+}
+
+void aio_read_completion_handler(sigval_t sigval)
+{
+ LOG_DEBUG("In read completion handler\n")
+
+ aio_read_write_completion_handler(sigval, READ, "Failed to attach JVM to AIO read thread\n", "Could not complete read request");
}
void aio_write_completion_handler(sigval_t sigval)
{
- if (DEBUG)
- {
- fprintf(stdout, "In write completion handler\n");
- fflush(stdout);
- }
+ LOG_DEBUG("In write completion handler\n")
- JNIEnv *env;
- jint res = jvm->AttachCurrentThread((void**)&env, NULL);
- if (res < 0)
- {
- fprintf(stderr, "Failed to attach JVM to AIO write thread\n");
- return;
- }
-
- jobject future = (jobject)sigval.sival_ptr;
- struct aiocb *req = (struct aiocb *)env->GetLongField(future, posixAioFutureReadWrite_aiocbPtrID);
-
- /* Did the request complete? */
- if (aio_error(req) == 0) {
- /* Request completed successfully, get number of bytes processed */
- int ret = aio_return( req );
-
- // Adjust buffer position
- jobject buffer = env->GetObjectField(future, posixAioFutureReadWrite_bufferID);
- int position = getBufferPosition(env, buffer) + ret;
- setBufferPosition(env, buffer, position);
-
- // Call aio listeners
- env->CallVoidMethod(future, posixAioFutureReadWrite_processFutureListenersID);
-
- // Free resources
- free(req);
- env->DeleteGlobalRef(future);
- } else {
- // TODO Find a way to handle exception here
- fprintf(stderr, "ERROR: AIO write request did NOT complete\n");
- fflush(stderr);
- }
-
- jvm->DetachCurrentThread();
- return;
+ aio_read_write_completion_handler(sigval, READ, "Failed to attach JVM to AIO write thread\n", "Could not complete write request");
}
Modified: mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java?view=diff&rev=479760&r1=479759&r2=479760
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java (original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java Mon Nov 27 12:47:37 2006
@@ -8,9 +8,17 @@
public void removeListener(AioFutureListener<T> ioFutureListener);
+ public void addErrorListener(AioFutureListener<T> ioFutureListener);
+
+ public void removeErrorListener(AioFutureListener<T> ioFutureListener);
+
public AsynchronousFileChannel getChannel();
public boolean isCompleted();
+
+ public boolean isSuccessful();
+
+ public Throwable getException();
public void join() throws InterruptedException;
Modified: mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFutureBatch.java
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFutureBatch.java?view=diff&rev=479760&r1=479759&r2=479760
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFutureBatch.java (original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFutureBatch.java Mon Nov 27 12:47:37 2006
@@ -1,10 +1,7 @@
package org.apache.aio;
-public interface AioFutureBatch extends AioFuture {
-
- public void addListener(AioFutureListener<AioFutureBatch> ioFutureListener);
-
- public void removeListener(AioFutureListener<AioFutureBatch> ioFutureListener);
-
+public interface AioFutureBatch extends AioFuture<AioFutureBatch> {
+
public BatchRequest[] getRequests();
+
}
Modified: mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/common/AbstractAioFuture.java
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/common/AbstractAioFuture.java?view=diff&rev=479760&r1=479759&r2=479760
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/common/AbstractAioFuture.java (original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/common/AbstractAioFuture.java Mon Nov 27 12:47:37 2006
@@ -14,7 +14,9 @@
private final AsynchronousFileChannel channel;
private final Operation operation;
private final List<AioFutureListener<T>> listeners = new LinkedList<AioFutureListener<T>>();
+ private final List<AioFutureListener<T>> errorListeners = new LinkedList<AioFutureListener<T>>();
private volatile boolean completed = false;
+ private volatile Throwable exception = null;
protected AbstractAioFuture(AsynchronousFileChannel channel, Operation operation) {
this.channel = channel;
@@ -30,6 +32,23 @@
}
}
+ public synchronized void removeListener(AioFutureListener<T> ioFutureListener) {
+ listeners.remove(ioFutureListener);
+ }
+
+ @SuppressWarnings("unchecked")
+ public synchronized void addErrorListener(AioFutureListener<T> ioFutureListener) {
+ if (exception != null) {
+ ioFutureListener.onCompletion((T)this);
+ } else {
+ errorListeners.add(ioFutureListener);
+ }
+ }
+
+ public synchronized void removeErrorListener(AioFutureListener<T> ioFutureListener) {
+ errorListeners.remove(ioFutureListener);
+ }
+
public AsynchronousFileChannel getChannel() {
return channel;
}
@@ -41,6 +60,14 @@
public boolean isCompleted() {
return completed;
}
+
+ public boolean isSuccessful() {
+ return exception == null;
+ }
+
+ public Throwable getException() {
+ return exception;
+ }
public synchronized void join() throws InterruptedException {
if (!completed) {
@@ -55,10 +82,6 @@
return isCompleted();
}
- public synchronized void removeListener(AioFutureListener<T> ioFutureListener) {
- listeners.remove(ioFutureListener);
- }
-
@SuppressWarnings("unchecked")
protected synchronized void processFutureListeners() {
completed = true;
@@ -68,4 +91,14 @@
}
}
+ @SuppressWarnings("unchecked")
+ protected synchronized void handleError(Throwable exception) {
+ this.exception = exception;
+ completed = true;
+ notifyAll();
+ for (AioFutureListener<T> listener : errorListeners) {
+ listener.onCompletion((T)this);
+ }
+ }
+
}
Modified: mina/sandbox/mheath/aioj/trunk/src/test/java/AIOTest.java
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/test/java/AIOTest.java?view=diff&rev=479760&r1=479759&r2=479760
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/test/java/AIOTest.java (original)
+++ mina/sandbox/mheath/aioj/trunk/src/test/java/AIOTest.java Mon Nov 27 12:47:37 2006
@@ -30,9 +30,12 @@
achannel = new AsynchronousFileChannel(out.getFD());
buffer.clear();
buffer.put("Have a really nice day!\n".getBytes());
+ System.out.println("Position: " + buffer.position());
buffer.flip();
+ System.out.println("Position: " + buffer.position());
future = achannel.write(buffer, 0);
- System.out.println(future);
+ future.join();
+ System.out.println("Position: " + buffer.position());
}
}
Modified: mina/sandbox/mheath/aioj/trunk/todo.txt
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/todo.txt?view=diff&rev=479760&r1=479759&r2=479760
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/todo.txt (original)
+++ mina/sandbox/mheath/aioj/trunk/todo.txt Mon Nov 27 12:47:37 2006
@@ -1,7 +1,12 @@
- Add support for heap buffers
- Throw IO Exceptions when aio calls return an error
- Figure out batch requests
+- Add support for timeouts and error handling
+- Add support for receiving file modification events
+- Make sure we're checking for errors on all JNI method calls
=== Testing ===
- Make sure the AsynchronousFileChannel can be unloaded without holding onto native references
-- Test for memory leaks
\ No newline at end of file
+- Test for memory leaks
+- Do performance tests
+- Write unit tests
\ No newline at end of file