You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by mh...@apache.org on 2007/01/09 07:23:56 UTC
svn commit: r494327 - in /mina/sandbox/mheath/aioj/trunk: ./ src/main/c/
src/main/java/org/apache/aio/ src/main/java/org/apache/aio/common/
Author: mheath
Date: Mon Jan 8 22:23:54 2007
New Revision: 494327
URL: http://svn.apache.org/viewvc?view=rev&rev=494327
Log:
Added write support
Modified:
mina/sandbox/mheath/aioj/trunk/src/main/c/Makefile
mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AsynchronousFileChannel.java
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/common/AbstractAioFuture.java
mina/sandbox/mheath/aioj/trunk/todo.txt
Modified: mina/sandbox/mheath/aioj/trunk/src/main/c/Makefile
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/c/Makefile?view=diff&rev=494327&r1=494326&r2=494327
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/c/Makefile (original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/c/Makefile Mon Jan 8 22:23:54 2007
@@ -28,6 +28,11 @@
$(TARGET): org_apache_aio.cpp $(WORKING_DIR)/org_apache_aio_AsynchronousFileChannel.h $(WORKING_DIR)/org_apache_aio_posix_PosixAioFutureReadWrite.h
g++ -shared -lrt -lstdc++ $(INCLUDES) org_apache_aio.cpp -o $(TARGET)
-$(WORKING_DIR)/org_apache_aio_AsynchronousFileChannel.h $(WORKING_DIR)/org_apache_aio_posix_PosixAioFutureReadWrite.h: $(TARGET_DIR)/classes/org/apache/aio/AsynchronousFileChannel.class $(TARGET_DIR)/classes/org/apache/aio/posix/PosixAioFutureReadWrite.class
+$(WORKING_DIR)/org_apache_aio_AsynchronousFileChannel.h: $(TARGET_DIR)/classes/org/apache/aio/AsynchronousFileChannel.class
mkdir -p $(TARGET_DIR)/jni
- javah -classpath $(JAVAH_CLASSPATH) -d $(WORKING_DIR) org.apache.aio.AsynchronousFileChannel org.apache.aio.posix.PosixAioFutureReadWrite
\ No newline at end of file
+ javah -force -classpath $(JAVAH_CLASSPATH) -d $(WORKING_DIR) org.apache.aio.AsynchronousFileChannel
+
+$(WORKING_DIR)/org_apache_aio_posix_PosixAioFutureReadWrite.h: $(TARGET_DIR)/classes/org/apache/aio/posix/PosixAioFutureReadWrite.class
+ mkdir -p $(TARGET_DIR)/jni
+ javah -force -classpath $(JAVAH_CLASSPATH) -d $(WORKING_DIR) org.apache.aio.posix.PosixAioFutureReadWrite
+
\ No newline at end of file
Modified: mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp?view=diff&rev=494327&r1=494326&r2=494327
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp (original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp Mon Jan 8 22:23:54 2007
@@ -16,10 +16,19 @@
#define LOG_DEBUG(s)
#endif
+struct aio_request
+{
+ struct aiocb aio;
+ jobject operation;
+ jobject future;
+ jint allocated_buffer_size;
+ jbyte buffer[];
+};
+
// --- jvm handler ---
+static JavaVM *jvm;
// --- Exception classes ---
-static JavaVM *jvm;
static jclass ioException;
static jclass nullPointerException;
@@ -31,9 +40,11 @@
static jmethodID abstractAioFuture_processFutureListenersID;
static jmethodID abstractAioFuture_handleErrorID;
+// --- IDs for file descriptors ---
static jfieldID fdID; // ID for java.io.FileDescriptor.fd
static jfieldID fieldDescID; // ID for org.apache.aio.AsynchronousFileChannel.fd
+// --- Operation enums ---
static jobject operationRead;
static jobject operationWrite;
static jobject operationBatchRead;
@@ -43,12 +54,11 @@
{
JNIEXPORT jint JNICALL JNI_OnLoad(JavaVM *vm, void *reserved);
JNIEXPORT void JNICALL JNI_OnUnload(JavaVM *vm, void *reserved);
- void aio_read_completion_handler(sigval_t sigval);
- void aio_write_completion_handler(sigval_t sigval);
+ void aio_read_write_completion_handler(sigval_t sigval);
}
// --- Utility Functions ----------------------------------------------------
-jint getFD(JNIEnv *env, jobject asynchFileChannel)
+inline jint getFD(JNIEnv *env, jobject asynchFileChannel)
{
jobject fieldDesc = env->GetObjectField(asynchFileChannel, fieldDescID);
return env->GetIntField(fieldDesc, fdID);
@@ -58,13 +68,16 @@
{
jclass cls = env->GetObjectClass(buffer);
jmethodID MID_position = env->GetMethodID(cls, "position", "()I");
- return env->CallIntMethod(buffer, MID_position);
+ env->DeleteLocalRef(cls);
+ jint ret = env->CallIntMethod(buffer, MID_position);
+ return ret;
}
inline void setBufferPosition(JNIEnv *env, jobject buffer, jint position)
{
jclass cls = env->GetObjectClass(buffer);
jmethodID MID_position = env->GetMethodID(cls, "position", "(I)Ljava/nio/Buffer;");
+ env->DeleteLocalRef(cls);
env->CallVoidMethod(buffer, MID_position, position);
}
@@ -72,6 +85,7 @@
{
jclass cls = env->GetObjectClass(buffer);
jmethodID MID_limit = env->GetMethodID(cls, "limit", "()I");
+ env->DeleteLocalRef(cls);
return env->CallIntMethod(buffer, MID_limit);
}
@@ -79,10 +93,11 @@
{
jclass cls = env->GetObjectClass(buffer);
jmethodID MID_limit = env->GetMethodID(cls, "limit", "(I)Ljava/nio/Buffer;");
+ env->DeleteLocalRef(cls);
env->CallVoidMethod(buffer, MID_limit, limit);
}
-jobject createPosixAioFutureReadWrite(JNIEnv *env, jobject channel, jobject operation, void *aiocb, jobject buffer, long position)
+inline jobject createPosixAioFutureReadWrite(JNIEnv *env, jobject channel, jobject operation, void *aiocb, jobject buffer, long position)
{
jvalue values[5];
values[0].l = channel;
@@ -94,43 +109,58 @@
return env->NewGlobalRef(future);
}
-struct aiocb *setupAioRequest(JNIEnv *env, jobject asynchronousFileChannel, jobject buffer, jlong position, jobject operation)
+struct aio_request *setupAioRequest(JNIEnv *env, jobject asynchronousFileChannel, jobject buffer, jlong position, jobject operation)
{
- // Get address and capacity of buffer
+ // Get address and capacity of buffer
if (buffer == NULL) {
env->ThrowNew(nullPointerException, "buffer cannot be null");
+ return NULL;
}
- // TODO: Add support for non direct byte buffers
+ // Adjust bufferAddress by the position of the buffer
+ jint bufferPosition = getBufferPosition(env, buffer);
+ jint bufferLimit = getBufferLimit(env, buffer)
+ jint bufferSize = bufferLimit - bufferPosition;
+
+ jint bufferToAllocate;
jbyte *bufferAddress = (jbyte *)env->GetDirectBufferAddress(buffer);
if (bufferAddress == NULL) {
- env->ThrowNew(ioException, "Must use direct ByteBuffer");
- return NULL;
+ bufferToAllocate = bufferSize;
+ } else {
+ bufferAddress += bufferPosition;
+ bufferToAllocate = 0;
}
- // Adjust bufferAddress by the position of the buffer
- jint bufferPosition = getBufferPosition(env, buffer);
- bufferAddress += bufferPosition;
- jint bufferSize = getBufferLimit(env, buffer) - position;
// Allocate aiocb
- struct aiocb *req = (aiocb*)malloc(sizeof(aiocb));
- bzero(req, sizeof(struct aiocb));
+ int aio_request_size = sizeof(aio_request) + bufferToAllocate;
+ struct aio_request *req = (aio_request *)malloc(aio_request_size);
+ bzero(req, aio_request_size);
+
+ if (bufferAddress == NULL) {
+ req->allocated_buffer_size = bufferToAllocate;
+ bufferAddress = req->buffer;
+
+ //jbytearray env->NewByteArray(bufferSize);
+
+ // TODO: For write, read data from buffer and put into allocated buffer and forward position
+ }
+
+ // TODO: For write, increment position
- jobject future = createPosixAioFutureReadWrite(env, asynchronousFileChannel, operation, req, buffer, position);
+ req->future = createPosixAioFutureReadWrite(env, asynchronousFileChannel, operation, req, buffer, position);
// Setup aiocb
- req->aio_fildes = getFD(env, asynchronousFileChannel);
- req->aio_offset = position;
- req->aio_buf = bufferAddress;
- req->aio_nbytes = bufferSize;
-
- req->aio_sigevent.sigev_notify = SIGEV_THREAD;
- if (operation == operationRead) {
- req->aio_sigevent.sigev_notify_function = aio_read_completion_handler;
- } else if (operation == operationWrite) {
- req->aio_sigevent.sigev_notify_function = aio_write_completion_handler;
- }
- req->aio_sigevent.sigev_notify_attributes = NULL;
- req->aio_sigevent.sigev_value.sival_ptr = future;
+ req->aio.aio_fildes = getFD(env, asynchronousFileChannel);
+ req->aio.aio_offset = position;
+ req->aio.aio_buf = bufferAddress;
+ req->aio.aio_nbytes = bufferSize;
+
+ // Set other aio request fields
+ req->operation = operation;
+
+ req->aio.aio_sigevent.sigev_notify = SIGEV_THREAD;
+ req->aio.aio_sigevent.sigev_notify_function = aio_read_write_completion_handler;
+ req->aio.aio_sigevent.sigev_notify_attributes = NULL;
+ req->aio.aio_sigevent.sigev_value.sival_ptr = req;
return req;
}
@@ -214,14 +244,15 @@
fprintf(stdout, "aio write at file position %d\n", position);
fflush(stdout);
#endif
- struct aiocb *req = setupAioRequest(env, obj, buffer, position, operationWrite);
- int ret = aio_write(req);
+ struct aio_request *req = setupAioRequest(env, obj, buffer, position, operationWrite);
+
+ int ret = aio_write(&req->aio);
if (ret) {
// TODO Handle errors from ret
// return a null on error.
return NULL;
}
- return (jobject)req->aio_sigevent.sigev_value.sival_ptr;
+ return req->future;
}
JNIEXPORT jobject JNICALL Java_org_apache_aio_AsynchronousFileChannel_read
@@ -231,9 +262,9 @@
fprintf(stdout, "aio read at file position %d\n", position);
fflush(stdout);
#endif
- struct aiocb *req = setupAioRequest(env, obj, buffer, position, operationRead);
+ struct aio_request *req = setupAioRequest(env, obj, buffer, position, operationRead);
LOG_DEBUG("Do aio read\n");
- int ret = aio_read(req);
+ int ret = aio_read(&req->aio);
if (ret) {
// TODO Handle errors from ret
// return a null on error.
@@ -241,7 +272,7 @@
}
// Return future object
- return (jobject)req->aio_sigevent.sigev_value.sival_ptr;
+ return req->future;
}
JNIEXPORT jobject JNICALL Java_org_apache_aio_AsynchronousFileChannel_batchRead
@@ -268,8 +299,8 @@
JNIEXPORT jboolean JNICALL Java_org_apache_aio_posix_PosixAioFutureReadWrite_cancel
(JNIEnv *env, jobject obj)
{
- struct aiocb *req = (struct aiocb *)env->GetLongField(obj, posixAioFutureReadWrite_aiocbPtrID);
- int ret = aio_cancel(req->aio_fildes, req);
+ struct aio_request *req = (struct aio_request *)env->GetLongField(obj, posixAioFutureReadWrite_aiocbPtrID);
+ int ret = aio_cancel(req->aio.aio_fildes, &req->aio);
if (ret == AIO_CANCELED) {
free(req);
return 1;
@@ -278,65 +309,48 @@
}
// --- Completion handlers ----------------------------------------------------------------
-enum operation_mode {READ, WRITE};
-
-void aio_read_write_completion_handler(sigval_t sigval, operation_mode mode, char *jvmAttachErrorMessage, char *failureMessage)
+void aio_read_write_completion_handler(sigval_t sigval)
{
+ LOG_DEBUG("In AIO completion handler\n")
+
JNIEnv *env;
jint res = jvm->AttachCurrentThread((void**)&env, NULL);
if (res < 0) {
- fprintf(stderr, jvmAttachErrorMessage);
+ fprintf(stderr, "Could not attach JVM to AIO thread");
fflush(stderr);
return;
}
- jobject future = (jobject)sigval.sival_ptr;
- struct aiocb *req = (struct aiocb *)env->GetLongField(future, posixAioFutureReadWrite_aiocbPtrID);
+ struct aio_request *req = (aio_request *)sigval.sival_ptr;
/* Did the request complete? */
- if (aio_error(req) == 0) {
+ if (aio_error(&req->aio) == 0) {
/* Request completed successfully, get number of bytes processed */
- int ret = aio_return( req );
-
- if (mode == READ) {
+ int ret = aio_return(&req->aio);
+
+ if (req->operation == operationRead) {
+ jobject buffer = env->GetObjectField(req->future, posixAioFutureReadWrite_bufferID);
// Adjust buffer limit
- jobject buffer = env->GetObjectField(future, posixAioFutureReadWrite_bufferID);
+ // TODO: Add support for writing read data to heap byte buffer
+
+ // Get buffer position
int limit = ret + getBufferPosition(env, buffer);
setBufferLimit(env, buffer, limit);
- } else if (mode == WRITE) {
- // Adjust buffer position
- jobject buffer = env->GetObjectField(future, posixAioFutureReadWrite_bufferID);
- int position = getBufferPosition(env, buffer) + ret;
- setBufferPosition(env, buffer, position);
}
// Call aio listeners
- env->CallVoidMethod(future, abstractAioFuture_processFutureListenersID);
+ env->CallVoidMethod(req->future, abstractAioFuture_processFutureListenersID);
} else {
// TODO Write a unit test for this
- env->ThrowNew(ioException, failureMessage);
+ env->ThrowNew(ioException, "Could not complete AIO request");
jthrowable e = env->ExceptionOccurred();
- env->CallVoidMethod(future, abstractAioFuture_handleErrorID, e);
+ env->CallVoidMethod(req->future, abstractAioFuture_handleErrorID, e);
}
// Free resources
+ env->DeleteGlobalRef(req->future);
free(req);
- env->DeleteGlobalRef(future);
jvm->DetachCurrentThread();
return;
}
-
-void aio_read_completion_handler(sigval_t sigval)
-{
- LOG_DEBUG("In read completion handler\n")
-
- aio_read_write_completion_handler(sigval, READ, "Failed to attach JVM to AIO read thread\n", "Could not complete read request");
-}
-
-void aio_write_completion_handler(sigval_t sigval)
-{
- LOG_DEBUG("In write completion handler\n")
-
- aio_read_write_completion_handler(sigval, WRITE, "Failed to attach JVM to AIO write thread\n", "Could not complete write request");
-}
Modified: mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java?view=diff&rev=494327&r1=494326&r2=494327
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java (original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java Mon Jan 8 22:23:54 2007
@@ -4,6 +4,12 @@
public interface AioFuture<T extends AioFuture> {
+ public boolean cancel();
+
+ public boolean isCancelled();
+
+ public boolean isDone();
+
public void addListener(AioFutureListener<T> ioFutureListener);
public void removeListener(AioFutureListener<T> ioFutureListener);
@@ -14,17 +20,11 @@
public AsynchronousFileChannel getChannel();
- public boolean isCompleted();
-
- public boolean isSuccessful();
-
public Throwable getException();
public void join() throws InterruptedException;
public boolean join(long timeout, TimeUnit timeUnit) throws InterruptedException;
-
- public boolean cancel();
public Operation getOperation();
Modified: mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AsynchronousFileChannel.java
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AsynchronousFileChannel.java?view=diff&rev=494327&r1=494326&r2=494327
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AsynchronousFileChannel.java (original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AsynchronousFileChannel.java Mon Jan 8 22:23:54 2007
@@ -1,12 +1,19 @@
package org.apache.aio;
+import java.io.Closeable;
import java.io.FileDescriptor;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.channels.Channel;
-public class AsynchronousFileChannel {
+/**
+ *
+ * @author Mike Heath <mh...@apache.org>
+ */
+public class AsynchronousFileChannel implements Closeable, Channel {
static {
+ // TODO Set up some loader so we can store the .so in the aioj.jar
System.loadLibrary("aioj");
}
@@ -16,9 +23,8 @@
if (fd == null) {
throw new NullPointerException("fd cannot be null");
}
- SecurityManager sm = System.getSecurityManager();
- if (sm != null) {
- sm.checkWrite(fd);
+ if (!fd.valid()) {
+ throw new IOException("fd must be a valid FileDescriptor");
}
this.fd = fd;
}
@@ -31,11 +37,20 @@
return fd.valid();
}
- public native AioFutureReadWrite write(ByteBuffer buffer, long position) throws IOException;
+ /**
+ * Calling close closes the AsynchronousFileChannel but does not actually close
+ * the file. You must close the file object where the file descriptor originated from.
+ *
+ */
+ public void close() {
+ fd = new FileDescriptor();
+ }
public native AioFutureReadWrite read(ByteBuffer buffer, long position) throws IOException;
public native AioFutureBatch batchRead(BatchRequest... reads) throws IOException;
+
+ public native AioFutureReadWrite write(ByteBuffer buffer, long position) throws IOException;
public native AioFutureBatch batchWrite(BatchRequest... writes) throws IOException;
Modified: mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/common/AbstractAioFuture.java
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/common/AbstractAioFuture.java?view=diff&rev=494327&r1=494326&r2=494327
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/common/AbstractAioFuture.java (original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/common/AbstractAioFuture.java Mon Jan 8 22:23:54 2007
@@ -57,11 +57,11 @@
return operation;
}
- public boolean isCompleted() {
+ public boolean isDone() {
return completed;
}
- public boolean isSuccessful() {
+ public boolean isCancelled() {
return exception == null;
}
@@ -79,7 +79,7 @@
if (!completed) {
wait(timeUnit.toMillis(timeout));
}
- return isCompleted();
+ return isDone();
}
@SuppressWarnings("unchecked")
Modified: mina/sandbox/mheath/aioj/trunk/todo.txt
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/todo.txt?view=diff&rev=494327&r1=494326&r2=494327
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/todo.txt (original)
+++ mina/sandbox/mheath/aioj/trunk/todo.txt Mon Jan 8 22:23:54 2007
@@ -1,13 +1,23 @@
- Add support for heap buffers
- Throw IO Exceptions when aio calls return an error
- Figure out batch requests
-- Add support for timeouts
+- Add support for timeouts -- does AIO itself have support for timeouts?
- Add support for receiving file modification events
- Make sure we're checking for errors on all JNI method calls
+- Modify AIOFuture to extend Java 5 Future
+- Look into simplifying AioFuture using generics
+- Update ByteBuffer immediately
+- Add support for aio_fsync
+- Provide an asynchronous open
=== Testing ===
- Make sure the AsynchronousFileChannel can be unloaded without holding onto native references
- Test for memory leaks
- Do performance tests
- Write unit tests
-- Determine if doing AttachCurrentThread or AttachCurrentThreadAsDaemon is faster in callback
\ No newline at end of file
+- Determine if doing AttachCurrentThread or AttachCurrentThreadAsDaemon is faster in callback
+- Do a test using util.concurrent asynch I/O
+
+=== Things to ask Alan Bateman ===
+- What about asynchronous file opens?
+- Is there an easy way to get to the ByteBuffer in a completion handler when doing a read?