You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by mh...@apache.org on 2007/02/24 23:43:52 UTC
svn commit: r511365 - in /mina/sandbox/mheath/aioj/trunk: ./ src/main/c/
src/main/java/org/apache/aio/ src/main/java/org/apache/aio/concurrent/
src/main/java/org/apache/aio/linux/ src/main/java/org/apache/aio/posix/
src/test/java/org/apache/aio/concurr...
Author: mheath
Date: Sat Feb 24 14:43:51 2007
New Revision: 511365
URL: http://svn.apache.org/viewvc?view=rev&rev=511365
Log:
Working aio read support.
Added:
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/AbstractAioFutureCallBackHandler.java
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixAioFutureImpl.java
Modified:
mina/sandbox/mheath/aioj/trunk/pom.xml
mina/sandbox/mheath/aioj/trunk/src/main/c/Makefile
mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/ByteBufferPosition.java
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/AioFutureImpl.java
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/ConcurrentAsynchronousFileChannel.java
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/ConcurrentAsynchronousFileChannelProvider.java
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/linux/LinuxAsynchronousFileChannelProvider.java
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixAsynchronousFileChannel.java
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixAsynchronousFileChannelProvider.java
mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/concurrent/TestReads.java
mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/posix/TestPosixFileChannel.java
mina/sandbox/mheath/aioj/trunk/todo.txt
Modified: mina/sandbox/mheath/aioj/trunk/pom.xml
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/pom.xml?view=diff&rev=511365&r1=511364&r2=511365
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/pom.xml (original)
+++ mina/sandbox/mheath/aioj/trunk/pom.xml Sat Feb 24 14:43:51 2007
@@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.aio</groupId>
- <artifactId>aio</artifactId>
+ <artifactId>aioj</artifactId>
<version>0.2-SNAPSHOT</version>
<name>Asynchronous File I/O in Java</name>
<description>
Modified: mina/sandbox/mheath/aioj/trunk/src/main/c/Makefile
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/c/Makefile?view=diff&rev=511365&r1=511364&r2=511365
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/c/Makefile (original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/c/Makefile Sat Feb 24 14:43:51 2007
@@ -25,7 +25,7 @@
all: $(TARGET)
-$(TARGET): org_apache_aio.cpp $(WORKING_DIR)/org_apache_aio_test_Test.h $(WORKING_DIR)/org_apache_aio_posix_PosixAsynchronousFileChannel.h $(WORKING_DIR)/org_apache_aio_linux_LinuxAsynchronousFileChannel.h
+$(TARGET): org_apache_aio.cpp $(WORKING_DIR)/org_apache_aio_test_Test.h $(WORKING_DIR)/org_apache_aio_posix_PosixAsynchronousFileChannel.h $(WORKING_DIR)/org_apache_aio_linux_LinuxAsynchronousFileChannel.h $(WORKING_DIR)/org_apache_aio_posix_PosixAioFutureImpl.h
g++ -shared -lrt -lstdc++ $(INCLUDES) org_apache_aio.cpp -o $(TARGET)
$(WORKING_DIR)/org_apache_aio_test_Test.h: $(TARGET_DIR)/classes/org/apache/aio/test/Test.class
@@ -35,6 +35,10 @@
$(WORKING_DIR)/org_apache_aio_posix_PosixAsynchronousFileChannel.h: $(TARGET_DIR)/classes/org/apache/aio/posix/PosixAsynchronousFileChannel.class
mkdir -p $(TARGET_DIR)/jni
javah -force -classpath $(JAVAH_CLASSPATH) -d $(WORKING_DIR) org.apache.aio.posix.PosixAsynchronousFileChannel
+
+$(WORKING_DIR)/org_apache_aio_posix_PosixAioFutureImpl.h: $(TARGET_DIR)/classes/org/apache/aio/posix/PosixAioFutureImpl.class
+ mkdir -p $(TARGET_DIR)/jni
+ javah -force -classpath $(JAVAH_CLASSPATH) -d $(WORKING_DIR) org.apache.aio.posix.PosixAioFutureImpl
$(WORKING_DIR)/org_apache_aio_linux_LinuxAsynchronousFileChannel.h: $(TARGET_DIR)/classes/org/apache/aio/linux/LinuxAsynchronousFileChannel.class
mkdir -p $(TARGET_DIR)/jni
Modified: mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp?view=diff&rev=511365&r1=511364&r2=511365
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp (original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp Sat Feb 24 14:43:51 2007
@@ -1,25 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
#include <aio.h>
+#include <errno.h>
#include <jni.h>
#include <malloc.h>
+#include <signal.h>
#include <stdio.h>
#include <string.h>
#include "org_apache_aio_test_Test.h"
+#include "org_apache_aio_posix_PosixAioFutureImpl.h"
#include "org_apache_aio_posix_PosixAsynchronousFileChannel.h"
#include "org_apache_aio_linux_LinuxAsynchronousFileChannel.h"
+// The signal number to use for AIO callbacks
+#define SIG_AIO (SIGRTMIN + 5)
+
#define DEBUG
#ifdef DEBUG
#define LOG_DEBUG(s) { fprintf(stdout, s); fflush(stdout); }
+#define LOG_DEBUG_PARAM(s, t) { fprintf(stdout, s, t); fflush(stdout); }
#endif
#ifndef DEBUG
#define LOG_DEBUG(s)
+#define LOG_DEBUG_PARAM(s, t)
#endif
#define JNI_VERSION JNI_VERSION_1_4
+#define NULL_CHECK(a) { if (a == NULL) { fprintf(stderr, "%s\n", "Error on " #a); return -1;} }
+
+struct buffer_size
+{
+ jint bufferSize; // The size of the buffer
+ jint allocatedBufferSize; // The number of bytes allocated for non-direct ByteBuffers
+ jbyte* buffer; // The address of the buffer
+};
+
+struct aio_request
+{
+ struct aiocb cb;
+ jobject future;
+ jint flags;
+ jint bufferCount;
+ struct buffer_size buffers[];
+};
+
+enum Flags {
+ NO_FLAG,
+ READ_FLAG,
+ WRITE_FLAG,
+ BATCH_READ,
+ BATCH_WRITE
+};
+
// --- jvm handler ---
static JavaVM *jvm;
@@ -28,6 +81,31 @@
static jclass ioException;
static jclass nullPointerException;
+// --- java.lang.Integer ids
+static jclass Integer;
+static jmethodID MID_Integer_valueOf;
+
+// --- java.io.FileDescriptor ids
+static jfieldID FID_fd;
+
+// --- org.apache.aio.posix.PosixAsynchronousFileChannel ids
+static jfieldID FID_posixChannelThreadNotify;
+static jfieldID FID_posixChannelFd;
+
+// --- org.apache.aio.posix.PosixAioFutureImpl
+static jfieldID FID_posixAioFuture_aioRequestPtr;
+static jmethodID MID_posixAioFuture_setValue;
+static jmethodID MID_posixAioFuture_setException;
+static jmethodID MID_posixAioFtgure_setValueInExecutorService;
+
+// --- org.apache.aio.posix.PosixAioFutureImpl.PosixAioReadWriteFuture
+static jclass posixAioReadWriteFuture;
+static jmethodID CID_posixAioReadWriteFuture;
+static jfieldID FID_posixAioReadWriteFuture_buffer;
+
+// --- Signal handler sigaction
+static struct sigaction sig_act;
+
extern "C"
{
JNIEXPORT jint JNICALL JNI_OnLoad(JavaVM *vm, void *reserved);
@@ -36,6 +114,248 @@
/**********************************************************************************************************************
*
+ * Utility functions
+ *
+ *********************************************************************************************************************/
+
+/*
+ * Returns the OS file descriptor from a PosixAsynchronousFileChannel object
+ */
+inline jint getPosixChannelFD(JNIEnv *env, jobject posixChannel)
+{
+ jobject fd = env->GetObjectField(posixChannel, FID_posixChannelFd);
+ return env->GetIntField(fd, FID_fd);
+}
+
+/*
+ * Allocates an aio_request. If non-direct ByteBuffer's are being used, this function will allocate space on the heap.
+ * For write operations, the data from the non-direct ByteBuffer will need be copied to allocated space on the heap.
+ * For read operations, the data stored in the heap will need be copied to the non-direct ByteBuffer.
+ *
+ * Params:
+ * bufferCount - the number of buffers to allocate
+ * bufferSizeToAllocate - the total number of bytes that need to be allocated on the heap for non-direct ByteBuffers
+ * Returns:
+ * an aio_request struct ptr
+ */
+struct aio_request *allocateRequest(int bufferCount, int bufferSizeToAllocate)
+{
+ int request_size = sizeof(aio_request) + sizeof(buffer_size) * bufferCount + bufferSizeToAllocate;
+ struct aio_request *request = (aio_request *)malloc(request_size);
+ bzero(request, request_size);
+ request->bufferCount = bufferCount;
+
+ return request;
+}
+
+jbyte* getBufferAddress(struct aio_request *request, int bufferIndex)
+{
+ jbyte *address = (jbyte *)request;
+ for (int i = 0; i < bufferIndex && i < request->bufferCount; i++) {
+ address += request->buffers[i].allocatedBufferSize;
+ }
+ return NULL;
+}
+
+void cleanupPosixAio(JNIEnv *env, struct aio_request *request)
+{
+ if (request != NULL) {
+ if (request->future != NULL) {
+ env->DeleteGlobalRef(request->future);
+ }
+ free(request);
+ }
+}
+
+jobject createPosixReadWriteAioFuture(JNIEnv *env, jobject channel, struct aio_request *request, jobject buffer, jlong position)
+{
+ jvalue values[4];
+ values[0].l = channel;
+ values[1].j =(jlong)request;
+ values[2].l = buffer;
+ values[3].j = position;
+ jobject future = env->NewObjectA(posixAioReadWriteFuture, CID_posixAioReadWriteFuture, values);
+ if (future == NULL) {
+ fprintf(stderr, "Could not create posix future object");
+ }
+ return future;
+}
+
+inline jint getByteBufferLimit(JNIEnv *env, jobject buffer, jclass bufferClass)
+{
+ jmethodID MID_limit = env->GetMethodID(bufferClass, "limit", "()I");
+ return env->CallIntMethod(buffer, MID_limit);
+}
+
+inline jint getByteBufferPosition(JNIEnv *env, jobject buffer, jclass bufferClass)
+{
+ jmethodID MID_position = env->GetMethodID(bufferClass, "position", "()I");
+ return env->CallIntMethod(buffer, MID_position);
+
+}
+
+inline void setByteBufferPosition(JNIEnv *env, jobject buffer, jclass bufferClass, jint position)
+{
+ jmethodID MID_setPosition = env->GetMethodID(bufferClass, "position", "(I)Ljava/nio/Buffer;");
+ jobject obj = env->CallObjectMethod(buffer, MID_setPosition, position);
+ env->DeleteLocalRef(obj);
+}
+
+inline jboolean byteBufferHasArray(JNIEnv *env, jobject buffer, jclass bufferClass)
+{
+ jmethodID MID_hasArray = env->GetMethodID(bufferClass, "hasArray", "()Z");
+ return env->CallBooleanMethod(buffer, MID_hasArray);
+}
+
+inline jint getByteBufferRemaining(JNIEnv *env, jobject buffer, jclass bufferClass)
+{
+ jmethodID MID_remaining = env->GetMethodID(bufferClass, "remaining", "()I");
+ return env->CallIntMethod(buffer, MID_remaining);
+}
+
+inline jbyteArray getByteBufferArray(JNIEnv *env, jobject buffer, jclass bufferClass)
+{
+ jmethodID MID_array = env->GetMethodID(bufferClass, "array", "()[B");
+ return (jbyteArray)env->CallObjectMethod(buffer, MID_array);
+}
+
+/**********************************************************************************************************************
+ *
+ * POSIX AIO Callback functions
+ *
+ *********************************************************************************************************************/
+
+void processPosixAioCallback(JNIEnv *env, struct aio_request *request, bool callbackInExecutorService)
+{
+ LOG_DEBUG("Processing callback\n");
+
+ int ret = aio_error(&request->cb);
+ LOG_DEBUG_PARAM("aio_error returned %d\n", ret);
+ if (ret == 0) {
+ // Request completed successfully
+
+ int totalBytes = aio_return(&request->cb);
+ LOG_DEBUG_PARAM("Processed %d bytes\n", totalBytes);
+ if (totalBytes < 0) {
+ // TODO: Throw exception
+ perror("Error with aio_return");
+ cleanupPosixAio(env, request);
+ return;
+ }
+ if (request->flags & READ_FLAG) {
+ LOG_DEBUG("Handling read\n");
+ // Advance buffer position
+ jobject buffer = env->GetObjectField(request->future, FID_posixAioReadWriteFuture_buffer);
+ if (buffer != NULL) {
+ LOG_DEBUG("Advancing buffer position\n");
+
+ jclass bufferClass = env->GetObjectClass(buffer);
+ int remaining = getByteBufferRemaining(env, buffer, bufferClass);
+ if (remaining < totalBytes) {
+ // TODO: Throw exception
+ cleanupPosixAio(env, request);
+
+ env->DeleteLocalRef(buffer);
+ env->DeleteLocalRef(bufferClass);
+
+ return;
+ }
+
+ // Copy data for heap buffer
+ jint position = getByteBufferPosition(env, buffer, bufferClass);
+ jbyte *address = (jbyte *)env->GetDirectBufferAddress(buffer);
+ if (address == NULL) {
+ if (byteBufferHasArray(env, buffer, bufferClass)) {
+ jbyteArray array = getByteBufferArray(env, buffer, bufferClass);
+ jbyte *arrayPtr = (jbyte *)env->GetPrimitiveArrayCritical(array, NULL);
+ memcpy(arrayPtr, request->buffers[0].buffer, totalBytes);
+ env->ReleasePrimitiveArrayCritical(array, arrayPtr, 0);
+ env->DeleteLocalRef(array);
+ } else {
+ // TODO: Handle byte buffers that don't support arrays
+ }
+ } else {
+ LOG_DEBUG("Setting buffer position in direct buffer\n");
+ setByteBufferPosition(env, buffer, bufferClass, position + totalBytes);
+ }
+
+ env->DeleteLocalRef(bufferClass);
+ }
+ env->DeleteLocalRef(buffer);
+ }
+
+ LOG_DEBUG("Invoke callbacks\n");
+ jobject totalBytesObj = env->CallStaticObjectMethod(Integer, MID_Integer_valueOf, totalBytes);
+ if (callbackInExecutorService) {
+ env->CallVoidMethod(request->future, MID_posixAioFtgure_setValueInExecutorService, totalBytesObj);
+ } else {
+ env->CallVoidMethod(request->future, MID_posixAioFuture_setValue, totalBytesObj);
+ }
+ env->DeleteLocalRef(totalBytesObj);
+ LOG_DEBUG("Done with callbacks\n");
+ } else {
+ char *msg;
+ switch (ret) {
+ case EFAULT:
+ msg = "Memory fault";
+ break;
+ case EINVAL:
+ msg = "Invalid AIO Control Block";
+ break;
+ case EINPROGRESS:
+ msg = "The AIO request was still in progress when the handler was called";
+ break;
+ case ECANCELED:
+ msg = "The AIO request was cancelled";
+ break;
+ default:
+ msg = "Unkown AIO error";
+ break;
+ }
+ // TODO call future setException method
+ perror(msg);
+ }
+
+ cleanupPosixAio(env, request);
+}
+
+void aioSignalHandler(int signo, siginfo_t *info, void *context)
+{
+ // Make sure we're getting the signal we're expecting
+ if (info->si_signo == SIG_AIO) {
+ LOG_DEBUG("In signal handler\n");
+
+ JNIEnv *env;
+ jvm->GetEnv((void**)&env, JNI_VERSION);
+ if (env == NULL) {
+ perror("Unable to obtain reference to JNI Environment, can not handle AIO completion\n");
+ } else {
+ struct aio_request *req = (struct aio_request *)info->si_value.sival_ptr;
+ processPosixAioCallback(env, req, true);
+ }
+ }
+}
+
+void aioThreadHandler(sigval_t sigval)
+{
+ LOG_DEBUG("In thread completion handler\n");
+ JNIEnv *env;
+ jint res = jvm->AttachCurrentThread((void**)&env, NULL);
+ if (res < 0) {
+ fprintf(stderr, "Could not attach JVM to AIO thread");
+ fflush(stderr);
+ return;
+ }
+
+ struct aio_request *req = (aio_request *)sigval.sival_ptr;
+ processPosixAioCallback(env, req, false);
+
+ jvm->DetachCurrentThread();
+ return;
+}
+
+/**********************************************************************************************************************
+ *
* JNI OnLoad and OnUnload functions
*
*********************************************************************************************************************/
@@ -57,14 +377,57 @@
jclass cls;
- cls = env->FindClass("org/apache/aio/AioException");
- aioException = (jclass)env->NewWeakGlobalRef(cls);
-
- cls = env->FindClass("java/io/IOException");
- ioException = (jclass)env->NewWeakGlobalRef(cls);
+ // --- AioException -----------------------------------------------------------------
+ cls = env->FindClass("org/apache/aio/AioException"); NULL_CHECK(cls)
+ aioException = (jclass)env->NewWeakGlobalRef(cls); NULL_CHECK(aioException)
+
+ // --- IoException ------------------------------------------------------------------
+ cls = env->FindClass("java/io/IOException"); NULL_CHECK(cls)
+ ioException = (jclass)env->NewWeakGlobalRef(cls); NULL_CHECK(ioException)
+
+ // --- NullPointerException ---------------------------------------------------------
+ cls = env->FindClass("java/lang/NullPointerException"); NULL_CHECK(cls)
+ nullPointerException = (jclass)env->NewWeakGlobalRef(cls); NULL_CHECK(nullPointerException)
+
+ // --- java.lang.Integer ------------------------------------------------------------
+ cls = env->FindClass("java/lang/Integer"); NULL_CHECK(cls)
+ Integer = (jclass)env->NewWeakGlobalRef(cls); NULL_CHECK(Integer);
+ MID_Integer_valueOf = env->GetStaticMethodID(Integer, "valueOf", "(I)Ljava/lang/Integer;");
- cls = env->FindClass("java/lang/NullPointerException");
- nullPointerException = (jclass)env->NewWeakGlobalRef(cls);
+ // --- java.io.FileDescriptor -------------------------------------------------------
+ cls = env->FindClass("java/io/FileDescriptor"); NULL_CHECK(cls)
+ FID_fd = env->GetFieldID(cls, "fd", "I"); NULL_CHECK(FID_fd)
+
+ // --- org.apache.aio.posix.PosixAsynchronousFileChannel ----------------------------
+ LOG_DEBUG("Getting ids for PosixAsynchronousFileChannel\n")
+ cls = env->FindClass("org/apache/aio/posix/PosixAsynchronousFileChannel"); NULL_CHECK(cls)
+ FID_posixChannelThreadNotify = env->GetFieldID(cls, "threadNotify", "Z"); NULL_CHECK(FID_posixChannelThreadNotify)
+ FID_posixChannelFd = env->GetFieldID(cls, "fd", "Ljava/io/FileDescriptor;"); NULL_CHECK(FID_posixChannelFd)
+
+ // --- org.apache.aio.posix.PosixAioFutureImpl --------------------------------------
+ LOG_DEBUG("Getting ids for PosixAioFutureImpl\n")
+ cls = env->FindClass("org/apache/aio/posix/PosixAioFutureImpl"); NULL_CHECK(cls)
+ FID_posixAioFuture_aioRequestPtr = env->GetFieldID(cls, "aioRequestPtr", "J"); NULL_CHECK(FID_posixAioFuture_aioRequestPtr)
+ MID_posixAioFuture_setValue = env->GetMethodID(cls, "setValue", "(Ljava/lang/Object;)V"); NULL_CHECK(MID_posixAioFuture_setValue)
+ MID_posixAioFtgure_setValueInExecutorService = env->GetMethodID(cls, "setValueInExecutorService", "(Ljava/lang/Object;)V"); NULL_CHECK(MID_posixAioFuture_setValue)
+ MID_posixAioFuture_setException = env->GetMethodID(cls, "setException", "(Ljava/lang/Throwable;)V"); NULL_CHECK(MID_posixAioFuture_setException)
+
+ // --- org.apache.aio.posix.PosixAioFutureImpl.PosixReadWriteAioFuture --------------
+ LOG_DEBUG("Getting ids for PosixAioFutureImpl.PosixReadWriteAioFuture\n")
+ cls = env->FindClass("org/apache/aio/posix/PosixAioFutureImpl$PosixReadWriteAioFuture"); NULL_CHECK(cls)
+ posixAioReadWriteFuture = (jclass)env->NewWeakGlobalRef(cls); NULL_CHECK(posixAioReadWriteFuture)
+ CID_posixAioReadWriteFuture = env->GetMethodID(posixAioReadWriteFuture, "<init>", "(Lorg/apache/aio/posix/PosixAsynchronousFileChannel;JLjava/nio/ByteBuffer;J)V"); NULL_CHECK(CID_posixAioReadWriteFuture)
+ FID_posixAioReadWriteFuture_buffer = env->GetFieldID(posixAioReadWriteFuture, "buffer", "Ljava/nio/ByteBuffer;"); NULL_CHECK(FID_posixAioReadWriteFuture_buffer)
+
+ // --- Initialize signal handler
+ LOG_DEBUG("Setting up signal handler\n")
+ sigemptyset(&sig_act.sa_mask);
+ sig_act.sa_flags = SA_SIGINFO;
+ sig_act.sa_sigaction = aioSignalHandler;
+ int ret = sigaction(SIG_AIO, &sig_act, NULL);
+ if (ret != 0) {
+ env->ThrowNew(aioException, "error registering signal handler");
+ }
return JNI_VERSION;
}
@@ -79,6 +442,7 @@
env->DeleteWeakGlobalRef(aioException);
env->DeleteWeakGlobalRef(ioException);
env->DeleteWeakGlobalRef(nullPointerException);
+ env->DeleteWeakGlobalRef(posixAioReadWriteFuture);
}
/**********************************************************************************************************************
@@ -95,7 +459,76 @@
JNIEXPORT jobject JNICALL Java_org_apache_aio_posix_PosixAsynchronousFileChannel_read__Ljava_nio_ByteBuffer_2J
(JNIEnv *env, jobject self, jobject buffer, jlong position)
{
- LOG_DEBUG("In read function");
+ LOG_DEBUG("In read function\n");
+
+ jclass bufferClass = env->GetObjectClass(buffer);
+
+ // Get buffer limit and position
+ jint bufferPosition = getByteBufferPosition(env, buffer, bufferClass);
+
+ jint bufferSize = getByteBufferRemaining(env, buffer, bufferClass);
+ jint bufferSizeToAllocate = 0;
+
+ jbyte *address = (jbyte *)env->GetDirectBufferAddress(buffer);
+ if (address == NULL) {
+ LOG_DEBUG("Using heap ByteBuffer\n");
+ bufferSizeToAllocate = bufferSize;
+ } else {
+ LOG_DEBUG("Using direct ByteBuffer\n");
+ address += bufferPosition;
+ }
+
+ struct aio_request *req = allocateRequest(1, bufferSizeToAllocate);
+
+ // Setup aio_request fields
+ req->buffers[0].bufferSize = bufferSize;
+ req->buffers[0].allocatedBufferSize = bufferSizeToAllocate;
+ if (address == NULL) {
+ address = getBufferAddress(req, 0);
+ }
+ req->buffers[0].buffer = address;
+
+ // Create future object
+ jobject future = createPosixReadWriteAioFuture(env, self, req, buffer, position);
+ if (future == NULL) {
+ // Error occured, return and let Java throw exception
+ return NULL;
+ }
+ req->future = env->NewGlobalRef(future);
+ if (req->future == NULL) {
+ // Error occured, return and let Java throw exception
+ return NULL;
+ }
+
+ // Set flag
+ req->flags = READ_FLAG;
+
+ // Setup aiocb
+ req->cb.aio_fildes = getPosixChannelFD(env, self);
+ req->cb.aio_offset = position;
+ req->cb.aio_buf = address;
+ req->cb.aio_nbytes = bufferSize;
+
+ req->cb.aio_sigevent.sigev_value.sival_ptr = req;
+ if (env->GetBooleanField(self, FID_posixChannelThreadNotify)) {
+ // Do thread notification
+ req->cb.aio_sigevent.sigev_notify = SIGEV_THREAD;
+ req->cb.aio_sigevent.sigev_notify_function = aioThreadHandler;
+ } else {
+ // Do SIG notification
+ req->cb.aio_sigevent.sigev_notify = SIGEV_SIGNAL;
+ req->cb.aio_sigevent.sigev_signo = SIG_AIO;
+ }
+
+ LOG_DEBUG("Calling aio_read\n");
+ int ret = aio_read(&req->cb);
+ if (ret) {
+ // TODO Handle each error return type as appropriate
+ perror("Error issuing aio_read");
+ cleanupPosixAio(env, req);
+ env->ThrowNew(aioException, "error issuing aio_read request");
+ }
+ return req->future;
}
/*
@@ -141,6 +574,29 @@
{
LOG_DEBUG("In batch write function");
+}
+
+/**********************************************************************************************************************
+ *
+ * org.apache.aio.posix.PosixAioFutureImpl method
+ *
+ *********************************************************************************************************************/
+
+/*
+ * Class: org_apache_aio_posix_PosixAioFutureImpl
+ * Method: cancel0
+ * Signature: ()Z
+ */
+JNIEXPORT jboolean JNICALL Java_org_apache_aio_posix_PosixAioFutureImpl_cancel0
+ (JNIEnv *env, jobject self)
+{
+ struct aio_request *request = (aio_request *)env->GetLongField(self, FID_posixAioFuture_aioRequestPtr);
+ int ret = aio_cancel(request->cb.aio_fildes, &request->cb);
+ if (ret == AIO_CANCELED) {
+ cleanupPosixAio(env, request);
+ return 1;
+ }
+ return 0;
}
/**********************************************************************************************************************
Modified: mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java?view=diff&rev=511365&r1=511364&r2=511365
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java (original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java Sat Feb 24 14:43:51 2007
@@ -19,6 +19,7 @@
*/
package org.apache.aio;
+import java.nio.ByteBuffer;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -46,7 +47,10 @@
int getLength();
}
- public static interface ByteBufferFuture extends AioFuture<Integer>, ByteBufferPosition {}
+ public static interface ByteBufferFuture extends AioFuture<Integer> {
+ public ByteBuffer getBuffer();
+ public long getPosition();
+ }
/**
* Associates a {@link AioCompletionHandler} with this future. The completion handler will be invoked when
Modified: mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/ByteBufferPosition.java
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/ByteBufferPosition.java?view=diff&rev=511365&r1=511364&r2=511365
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/ByteBufferPosition.java (original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/ByteBufferPosition.java Sat Feb 24 14:43:51 2007
@@ -26,19 +26,31 @@
*
* @author mheath
*/
-public interface ByteBufferPosition {
+public final class ByteBufferPosition {
+ private final ByteBuffer buffer;
+ private final long position;
+
+ public ByteBufferPosition(ByteBuffer buffer, long position) {
+ this.buffer = buffer;
+ this.position = position;
+ }
+
/**
* Fetches the buffer associated with this <tt>ByteBufferPosition</tt>.
*
* @return The buffer associated with this <tt>ByteBufferPosition</tt>.
*/
- ByteBuffer getByteBuffer();
+ public final ByteBuffer getBuffer() {
+ return buffer;
+ }
/**
*
* @return
*/
- long getPosition();
+ public final long getPosition() {
+ return position;
+ }
}
Added: mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/AbstractAioFutureCallBackHandler.java
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/AbstractAioFutureCallBackHandler.java?view=auto&rev=511365
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/AbstractAioFutureCallBackHandler.java (added)
+++ mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/AbstractAioFutureCallBackHandler.java Sat Feb 24 14:43:51 2007
@@ -0,0 +1,45 @@
+package org.apache.aio.concurrent;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.aio.AioCompletionHandler;
+import org.apache.aio.AioFuture;
+
+public abstract class AbstractAioFutureCallBackHandler<V> implements AioFuture<V> {
+
+ private final List<AioCompletionHandler<V>> completionHandlers = new ArrayList<AioCompletionHandler<V>>();
+ private Object attachment;
+
+ public AbstractAioFutureCallBackHandler() {
+ super();
+ }
+
+ public synchronized void addCompletionHandler(AioCompletionHandler<V> completionHandler) {
+ completionHandlers.add(completionHandler);
+ }
+
+ public synchronized boolean removeCompletionHandler(AioCompletionHandler<V> completionHandler) {
+ return completionHandlers.remove(completionHandler);
+ }
+
+ public synchronized void callCompletionHandlers() {
+ for (AioCompletionHandler<V> completionHandler : completionHandlers) {
+ try {
+ completionHandler.onCompletion(this);
+ } catch (Throwable e) {
+ // TODO Handle exception
+ e.printStackTrace();
+ }
+ }
+ }
+
+ public Object getAttachment() {
+ return attachment;
+ }
+
+ public void setAttachment(Object attachment) {
+ this.attachment = attachment;
+ }
+
+}
\ No newline at end of file
Modified: mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/AioFutureImpl.java
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/AioFutureImpl.java?view=diff&rev=511365&r1=511364&r2=511365
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/AioFutureImpl.java (original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/AioFutureImpl.java Sat Feb 24 14:43:51 2007
@@ -20,20 +20,17 @@
package org.apache.aio.concurrent;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.aio.AioCallbackException;
-import org.apache.aio.AioCompletionHandler;
import org.apache.aio.AioFuture;
import org.apache.aio.AsynchronousFileChannel;
import org.apache.aio.ByteBufferPosition;
-public class AioFutureImpl<V> implements AioFuture<V> {
+public class AioFutureImpl<V> extends AbstractAioFutureCallBackHandler<V> implements AioFuture<V> {
public static class BatchFutureImpl extends AioFutureImpl<Long> implements BatchFuture {
@@ -80,16 +77,16 @@
public static class ByteBufferFutureImpl extends AioFutureImpl<Integer> implements ByteBufferFuture {
- private final ByteBuffer byteBuffer;
+ private final ByteBuffer buffer;
private final long position;
- public ByteBufferFutureImpl(ByteBuffer byteBuffer, long position) {
- this.byteBuffer = byteBuffer;
+ public ByteBufferFutureImpl(ByteBuffer buffer, long position) {
+ this.buffer = buffer;
this.position = position;
}
- public ByteBuffer getByteBuffer() {
- return byteBuffer;
+ public ByteBuffer getBuffer() {
+ return buffer;
}
public long getPosition() {
@@ -98,23 +95,10 @@
}
-// public static class LockFutureImpl extends AbstractAioFuture<FileLock, LockFuture> implements LockFuture {
-//
-// }
-//
-// public static class OpenFutureImpl extends AbstractAioFuture<AsynchronousFileChannel, OpenFuture> implements OpenFuture {
-//
-// }
-//
-// public static class SyncFutureImpl
-//
-// public static class TruncateFutureImpl
-//
private Future<V> future;
private Object attachment = null;
private AsynchronousFileChannel asynchronousFileChannel = null;
- private final List<AioCompletionHandler<V>> completionHandlers = new ArrayList<AioCompletionHandler<V>>();
- private ExecutionException exception;
+ ExecutionException exception;
public AioFutureImpl() {
// Default constructor
@@ -122,24 +106,6 @@
public AioFutureImpl(AsynchronousFileChannel channel) {
this.asynchronousFileChannel = channel;
- }
-
- public synchronized void addCompletionHandler(AioCompletionHandler<V> completionHandler) {
- completionHandlers.add(completionHandler);
- }
-
- public synchronized boolean removeCompletionHandler(AioCompletionHandler<V> completionHandler) {
- return completionHandlers.remove(completionHandler);
- }
-
- public synchronized void callCompletionHandlers() {
- try {
- for (AioCompletionHandler<V> completionHandler : completionHandlers) {
- completionHandler.onCompletion(this);
- }
- } catch (Throwable e) {
- exception = new ExecutionException(e);
- }
}
public V get() throws InterruptedException, ExecutionException, AioCallbackException {
Modified: mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/ConcurrentAsynchronousFileChannel.java
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/ConcurrentAsynchronousFileChannel.java?view=diff&rev=511365&r1=511364&r2=511365
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/ConcurrentAsynchronousFileChannel.java (original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/ConcurrentAsynchronousFileChannel.java Sat Feb 24 14:43:51 2007
@@ -68,7 +68,7 @@
Future<FileLock> future = executorService.submit(new Callable<FileLock>() {
public FileLock call() throws Exception {
FileLock lock = channel.lock(position, size, shared);
- callCompletionHandlers(aioFuture);
+ callCompletionHandlersInExecutorService(aioFuture);
return lock;
}
@@ -86,7 +86,7 @@
Future<Integer> future = executorService.submit(new Callable<Integer>() {
public Integer call() throws Exception {
int bytesRead = channel.read(buffer, position);
- callCompletionHandlers(byteBufferFuture);
+ callCompletionHandlersInExecutorService(byteBufferFuture);
return bytesRead;
}
});
@@ -108,9 +108,9 @@
long length = 0;
for (int i = 0; i < length; i++) {
ByteBufferPosition byteBufferPosition = byteBufferPositions[i + offset];
- length += channel.read(byteBufferPosition.getByteBuffer(), byteBufferPosition.getPosition());
+ length += channel.read(byteBufferPosition.getBuffer(), byteBufferPosition.getPosition());
}
- callCompletionHandlers(batchFuture);
+ callCompletionHandlersInExecutorService(batchFuture);
return length;
}
});
@@ -133,7 +133,7 @@
Future<Void> future = executorService.submit(new Callable<Void>() {
public Void call() throws Exception {
channel.force(metaData);
- callCompletionHandlers(aioFuture);
+ callCompletionHandlersInExecutorService(aioFuture);
return null;
}
});
@@ -150,7 +150,7 @@
Future<Void> future = executorService.submit(new Callable<Void>() {
public Void call() throws Exception {
channel.truncate(size);
- callCompletionHandlers(aioFuture);
+ callCompletionHandlersInExecutorService(aioFuture);
return null;
}
});
@@ -176,7 +176,7 @@
Future<Integer> future = executorService.submit(new Callable<Integer>() {
public Integer call() throws Exception {
int bytesWritten = channel.write(buffer, position);
- callCompletionHandlers(byteBufferFuture);
+ callCompletionHandlersInExecutorService(byteBufferFuture);
return bytesWritten;
}
});
@@ -198,9 +198,9 @@
long length = 0;
for (int i = 0; i < length; i++) {
ByteBufferPosition byteBufferPosition = byteBufferPositions[i + offset];
- length += channel.write(byteBufferPosition.getByteBuffer(), byteBufferPosition.getPosition());
+ length += channel.write(byteBufferPosition.getBuffer(), byteBufferPosition.getPosition());
}
- callCompletionHandlers(batchFuture);
+ callCompletionHandlersInExecutorService(batchFuture);
return length;
}
});
@@ -212,7 +212,7 @@
return channel.isOpen();
}
- private void callCompletionHandlers(final AioFutureImpl aioFuture) {
+ public void callCompletionHandlersInExecutorService(final AbstractAioFutureCallBackHandler aioFuture) {
executorService.submit(new Runnable() {
public void run() {
aioFuture.callCompletionHandlers();
@@ -220,4 +220,8 @@
});
}
+ public ExecutorService getExecutorService() {
+ return executorService;
+ }
+
}
Modified: mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/ConcurrentAsynchronousFileChannelProvider.java
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/ConcurrentAsynchronousFileChannelProvider.java?view=diff&rev=511365&r1=511364&r2=511365
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/ConcurrentAsynchronousFileChannelProvider.java (original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/ConcurrentAsynchronousFileChannelProvider.java Sat Feb 24 14:43:51 2007
@@ -20,6 +20,7 @@
package org.apache.aio.concurrent;
import java.io.File;
+import java.io.FileDescriptor;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.util.Properties;
@@ -36,19 +37,17 @@
public class ConcurrentAsynchronousFileChannelProvider implements AsynchronousFileChannelProvider {
- public static final String PROPERTY_DEFAULT_THREAD_POOL_SIZE =
- ConcurrentAsynchronousFileChannelProvider.class.getName() + ".defaultThreadPoolSize";
+ public static final String PROPERTY_DEFAULT_THREAD_POOL_SIZE = ConcurrentAsynchronousFileChannelProvider.class
+ .getName()
+ + ".defaultThreadPoolSize";
private static final Integer DEFAULT_THREAD_POOL_SIZE = (Runtime.getRuntime().availableProcessors() + 1) * 2;
private static ExecutorService staticExecutorService;
- public AioFuture<AsynchronousFileChannel> open(
- final File file,
- final Modes mode,
- final ExecutorService executorService,
- final Properties properties) {
-
+ public AioFuture<AsynchronousFileChannel> open(final File file, final Modes mode,
+ final ExecutorService executorService, final Properties properties) {
+
// Determine which ExecutorService to use
final ExecutorService service;
if (executorService == null) {
@@ -56,31 +55,34 @@
} else {
service = executorService;
}
-
+
final AioFutureImpl<AsynchronousFileChannel> aioFuture = new AioFutureImpl<AsynchronousFileChannel>();
-
+
Future<AsynchronousFileChannel> future = service.submit(new Callable<AsynchronousFileChannel>() {
public AsynchronousFileChannel call() throws Exception {
- final FileChannel channel = new RandomAccessFile(file, mode.getModeString()).getChannel();
- AsynchronousFileChannel afc = createAsynchronouseFileChannel(channel, mode, service);
-
+ final RandomAccessFile raf = new RandomAccessFile(file, mode.getModeString());
+ final FileChannel channel = raf.getChannel();
+ AsynchronousFileChannel afc = createAsynchronouseFileChannel(raf.getFD(), channel, mode, service,
+ properties);
+
// Call completion handlers
service.submit(new Runnable() {
- public void run() {
- aioFuture.callCompletionHandlers();
- }
+ public void run() {
+ aioFuture.callCompletionHandlers();
+ }
});
-
+
return afc;
}
});
aioFuture.setFuture(future);
-
+
return aioFuture;
}
- protected AsynchronousFileChannel createAsynchronouseFileChannel(final FileChannel channel, final Modes mode, final ExecutorService service) {
+ protected AsynchronousFileChannel createAsynchronouseFileChannel(final FileDescriptor fd,
+ final FileChannel channel, final Modes mode, final ExecutorService service, final Properties properties) {
return new ConcurrentAsynchronousFileChannel(channel, mode, service);
}
Modified: mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/linux/LinuxAsynchronousFileChannelProvider.java
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/linux/LinuxAsynchronousFileChannelProvider.java?view=diff&rev=511365&r1=511364&r2=511365
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/linux/LinuxAsynchronousFileChannelProvider.java (original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/linux/LinuxAsynchronousFileChannelProvider.java Sat Feb 24 14:43:51 2007
@@ -19,7 +19,9 @@
*/
package org.apache.aio.linux;
+import java.io.FileDescriptor;
import java.nio.channels.FileChannel;
+import java.util.Properties;
import java.util.concurrent.ExecutorService;
import org.apache.aio.AsynchronousFileChannel;
@@ -34,7 +36,8 @@
*/
public class LinuxAsynchronousFileChannelProvider extends ConcurrentAsynchronousFileChannelProvider {
@Override
- protected AsynchronousFileChannel createAsynchronouseFileChannel(FileChannel channel, Modes mode, ExecutorService service) {
+ protected AsynchronousFileChannel createAsynchronouseFileChannel(FileDescriptor fd, FileChannel channel,
+ Modes mode, ExecutorService service, Properties properties) {
return new LinuxAsynchronousFileChannel(channel, mode, service);
}
}
Added: mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixAioFutureImpl.java
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixAioFutureImpl.java?view=auto&rev=511365
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixAioFutureImpl.java (added)
+++ mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixAioFutureImpl.java Sat Feb 24 14:43:51 2007
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.aio.posix;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.aio.AioCallbackException;
+import org.apache.aio.AsynchronousFileChannel;
+import org.apache.aio.concurrent.AbstractAioFutureCallBackHandler;
+
+public class PosixAioFutureImpl<V> extends AbstractAioFutureCallBackHandler<V> {
+
+ public static class PosixReadWriteAioFuture extends PosixAioFutureImpl<Integer> implements ByteBufferFuture {
+
+ private final ByteBuffer buffer;
+ private final long position;
+
+ public PosixReadWriteAioFuture(PosixAsynchronousFileChannel channel, long aioRequestPtr, ByteBuffer buffer, long position) {
+ super(channel, aioRequestPtr);
+ this.buffer = buffer;
+ this.position = position;
+ }
+
+ public ByteBuffer getBuffer() {
+ return buffer;
+ }
+
+ public long getPosition() {
+ return position;
+ }
+
+ }
+
+ private final PosixAsynchronousFileChannel channel;
+ @SuppressWarnings("unused")
+ private final long aioRequestPtr; // The pointer to the aio request struct (used by native code)
+
+ private final Lock lock = new ReentrantLock();
+ private final Condition condition = lock.newCondition();
+
+ private volatile Throwable exception;
+ private volatile boolean done = false;
+ private volatile V value = null;
+
+ public PosixAioFutureImpl(PosixAsynchronousFileChannel channel, long aioRequestPtr) {
+ this.channel = channel;
+ this.aioRequestPtr = aioRequestPtr;
+ }
+
+ public V get() throws InterruptedException, ExecutionException, AioCallbackException {
+ lock.lockInterruptibly();
+ try {
+ // TOOD - Come up with a better blocking mechanism than a spin lock that works with signals
+ while (!done) {
+ Thread.yield();
+ System.out.print(".");
+ }
+ System.out.println("Done with get!");
+ return getValue();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ lock.lockInterruptibly();
+ try {
+ if (done) {
+ return getValue();
+ }
+ condition.await(timeout, unit);
+ return getValue();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public AsynchronousFileChannel getChannel() {
+ return channel;
+ }
+
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ if (done) {
+ return false;
+ }
+ boolean canceled = cancel0();
+ if (canceled) {
+ done = true;
+ exception = new CancellationException();
+ lock.lock();
+ try {
+ condition.signalAll();
+ } finally {
+ lock.unlock();
+ }
+ }
+ return canceled;
+ }
+
+ private native boolean cancel0();
+
+ public boolean isCancelled() {
+ return done == true && exception instanceof CancellationException;
+ }
+
+ public boolean isDone() {
+ return done;
+ }
+
+ protected V getValue() throws ExecutionException {
+ if (exception != null) {
+ if (exception instanceof ExecutionException) {
+ throw (ExecutionException)exception;
+ }
+ throw new ExecutionException(exception);
+ }
+ return value;
+ }
+
+ protected void setValue(V value) {
+ this.value = value;
+ done = true;
+ signal();
+ }
+
+ protected void setValueInExecutorService(final V value) {
+ channel.getExecutorService().submit(new Runnable() {
+ public void run() {
+ setValue(value);
+ }
+ });
+ }
+
+ public void setException(Throwable exception) {
+ this.exception = exception;
+ done = true;
+ signal();
+ }
+
+ private void signal() {
+ System.out.println("Signalling");
+ lock.lock();
+ try {
+ condition.signalAll();
+ } finally {
+ lock.unlock();
+ }
+
+ callCompletionHandlers();
+ }
+
+}
Modified: mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixAsynchronousFileChannel.java
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixAsynchronousFileChannel.java?view=diff&rev=511365&r1=511364&r2=511365
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixAsynchronousFileChannel.java (original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixAsynchronousFileChannel.java Sat Feb 24 14:43:51 2007
@@ -1,5 +1,6 @@
package org.apache.aio.posix;
+import java.io.FileDescriptor;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.concurrent.ExecutorService;
@@ -14,8 +15,15 @@
public class PosixAsynchronousFileChannel extends ConcurrentAsynchronousFileChannel {
- public PosixAsynchronousFileChannel(FileChannel channel, Modes mode, ExecutorService executorService) {
+ @SuppressWarnings("unused") // This is used by native code
+ private final boolean threadNotify;
+ @SuppressWarnings("unused")
+ private final FileDescriptor fd;
+
+ public PosixAsynchronousFileChannel(FileDescriptor fd, FileChannel channel, Modes mode, ExecutorService executorService, boolean threadNotify) {
super(channel, mode, executorService);
+ this.fd = fd;
+ this.threadNotify = threadNotify;
}
@Override
@@ -33,4 +41,8 @@
@Override
public native BatchFuture write(ByteBufferPosition[] byteBufferPositions, int offset, int length);
+ public void test() {
+ System.out.println("This is from Java");
+ System.out.println("I love JNI");
+ }
}
Modified: mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixAsynchronousFileChannelProvider.java
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixAsynchronousFileChannelProvider.java?view=diff&rev=511365&r1=511364&r2=511365
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixAsynchronousFileChannelProvider.java (original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixAsynchronousFileChannelProvider.java Sat Feb 24 14:43:51 2007
@@ -19,7 +19,9 @@
*/
package org.apache.aio.posix;
+import java.io.FileDescriptor;
import java.nio.channels.FileChannel;
+import java.util.Properties;
import java.util.concurrent.ExecutorService;
import org.apache.aio.AsynchronousFileChannel;
@@ -28,9 +30,18 @@
public class PosixAsynchronousFileChannelProvider extends ConcurrentAsynchronousFileChannelProvider {
+ public static final String PROPERTY_THREAD_NOTIFY = PosixAsynchronousFileChannel.class.getName() + ".threadNotify";
+
@Override
- protected AsynchronousFileChannel createAsynchronouseFileChannel(FileChannel channel, Modes mode, ExecutorService service) {
- return new PosixAsynchronousFileChannel(channel, mode, service);
+ protected AsynchronousFileChannel createAsynchronouseFileChannel(FileDescriptor fd, FileChannel channel,
+ Modes mode, ExecutorService service, Properties properties) {
+ String systemDefaultThreadNotify = System.getProperty(PROPERTY_THREAD_NOTIFY, Boolean.FALSE.toString());
+ String threadNotify;
+ if (properties == null) {
+ threadNotify = systemDefaultThreadNotify;
+ } else {
+ threadNotify = properties.getProperty(PROPERTY_THREAD_NOTIFY, systemDefaultThreadNotify);
+ }
+ return new PosixAsynchronousFileChannel(fd, channel, mode, service, Boolean.valueOf(threadNotify));
}
-
}
Modified: mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/concurrent/TestReads.java
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/concurrent/TestReads.java?view=diff&rev=511365&r1=511364&r2=511365
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/concurrent/TestReads.java (original)
+++ mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/concurrent/TestReads.java Sat Feb 24 14:43:51 2007
@@ -56,7 +56,7 @@
readFuture.addCompletionHandler(new AioCompletionHandler<Integer>() {
public void onCompletion(AioFuture<Integer> future) {
ByteBufferFuture byteBufferFuture = (ByteBufferFuture)future;
- ByteBuffer buffer = byteBufferFuture.getByteBuffer();
+ ByteBuffer buffer = byteBufferFuture.getBuffer();
assert buffer.position() == buffer.limit();
buffer.flip();
Modified: mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/posix/TestPosixFileChannel.java
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/posix/TestPosixFileChannel.java?view=diff&rev=511365&r1=511364&r2=511365
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/posix/TestPosixFileChannel.java (original)
+++ mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/posix/TestPosixFileChannel.java Sat Feb 24 14:43:51 2007
@@ -1,6 +1,11 @@
package org.apache.aio.posix;
-import org.apache.aio.linux.LinuxAsynchronousFileChannel;
+import java.io.FileInputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.Executors;
+
+import org.apache.aio.AioFuture;
+import org.apache.aio.Modes;
public class TestPosixFileChannel {
@@ -8,13 +13,21 @@
System.loadLibrary("aioj");
}
- public static void main(String[] args) {
- PosixAsynchronousFileChannel channel = new PosixAsynchronousFileChannel(null, null, null);
- channel.read(null, 0L);
-
- LinuxAsynchronousFileChannel linuxChannel = new LinuxAsynchronousFileChannel(null, null, null);
- linuxChannel.read(null, 0L);
-
+ public static void main(String[] args) throws Exception {
+ FileInputStream in = new FileInputStream("/etc/passwd");
+ PosixAsynchronousFileChannel channel = new PosixAsynchronousFileChannel(
+ in.getFD(),
+ in.getChannel(),
+ Modes.READ_ONLY,
+ Executors.newCachedThreadPool(), true);
+ ByteBuffer buffer = ByteBuffer.allocateDirect(1000);
+ AioFuture<Integer> future = channel.read(buffer, 0L);
+ future.get();
+ buffer.flip();
+ byte[] bytes = new byte[buffer.remaining()];
+ buffer.get(bytes);
+ System.out.println(new String(bytes));
+ System.out.println("Done!");
}
}
Modified: mina/sandbox/mheath/aioj/trunk/todo.txt
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/todo.txt?view=diff&rev=511365&r1=511364&r2=511365
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/todo.txt (original)
+++ mina/sandbox/mheath/aioj/trunk/todo.txt Sat Feb 24 14:43:51 2007
@@ -1,15 +1,12 @@
- Add support for heap buffers
-- Throw IO Exceptions when aio calls return an error
+- Throw AIOExceptions when aio calls return an error
- Figure out batch requests
- Add support for timeouts -- does AIO itself have support for timeouts?
- Add support for receiving file modification events
- Make sure we're checking for errors on all JNI method calls
-- Modify AIOFuture to extend Java 5 Future
-- Look into simplifying AioFuture using generics
-- Update ByteBuffer immediately
- Add support for aio_fsync
-- Provide an asynchronous open
- Add support for using Groovy Closures for observers
+- Add support for prefetching ByteBuffer method id's
=== Testing ===
- Make sure the AsynchronousFileChannel can be unloaded without holding onto native references