You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by mh...@apache.org on 2007/01/31 05:49:35 UTC
svn commit: r501727 - in /mina/sandbox/mheath/aioj/trunk/src:
main/java/org/apache/aio/ main/java/org/apache/aio/concurrent/
test/java/org/apache/aio/concurrent/
Author: mheath
Date: Tue Jan 30 20:49:34 2007
New Revision: 501727
URL: http://svn.apache.org/viewvc?view=rev&rev=501727
Log:
Added read test.
Made some API changes.
Added:
mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/concurrent/TestReads.java
Modified:
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioCompletionHandler.java
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/AioFutureImpl.java
mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/ConcurrentAsynchronousFileChannel.java
mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/concurrent/TestOpen.java
Modified: mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioCompletionHandler.java
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioCompletionHandler.java?view=diff&rev=501727&r1=501726&r2=501727
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioCompletionHandler.java (original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioCompletionHandler.java Tue Jan 30 20:49:34 2007
@@ -29,7 +29,7 @@
*
* @param <T> The type of future object to be processed.
*/
-public interface AioCompletionHandler<T extends AioFuture> {
+public interface AioCompletionHandler<T> {
/**
* The method invoked when the operation completes.
@@ -39,6 +39,6 @@
*
* @param future The future object representing the file channel operation.
*/
- void onCompletion(T future);
+ void onCompletion(AioFuture<T> future);
}
Modified: mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java?view=diff&rev=501727&r1=501726&r2=501727
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java (original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java Tue Jan 30 20:49:34 2007
@@ -39,22 +39,14 @@
*/
public interface AioFuture<V> extends Future<V> {
- static interface BatchFuture extends AioFuture<Long> {
+ public static interface BatchFuture extends AioFuture<Long> {
ByteBufferPosition[] getBatch();
ByteBufferPosition[] getUsedBatch();
int getOffset();
int getLength();
}
- static interface ByteBufferFuture extends AioFuture<Integer>, ByteBufferPosition {}
-
- //static interface LockFuture extends AioFuture<FileLock, LockFuture> {}
-
- //static interface OpenFuture extends AioFuture<AsynchronousFileChannel, OpenFuture> {}
-
- //static interface SyncFuture extends AioFuture<Void, SyncFuture> {}
-
- //static interface TruncateFuture extends AioFuture<Void, TruncateFuture> {}
+ public static interface ByteBufferFuture extends AioFuture<Integer>, ByteBufferPosition {}
/**
* Associates a {@link AioCompletionHandler} with this future. The completion handler will be invoked when
@@ -62,7 +54,7 @@
*
* @param completionHandler The completion handler to be invoked.
*/
- void addCompletionHandler(AioCompletionHandler<AioFuture> completionHandler);
+ void addCompletionHandler(AioCompletionHandler<V> completionHandler);
/**
* Removes a {@link AioCompletionHandler} from this future.
@@ -70,7 +62,7 @@
* @param completionHandler The completion handler to be disassociated with this future
* @return Returns true if the completion handler was disassociated with this future, false otherwise.
*/
- boolean removeCompletionHandler(AioCompletionHandler<AioFuture> completionHandler);
+ boolean removeCompletionHandler(AioCompletionHandler<V> completionHandler);
/**
* Returns the {@link AsynchronousFileChannel} where the operation represnted by this future orginated.
Modified: mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/AioFutureImpl.java
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/AioFutureImpl.java?view=diff&rev=501727&r1=501726&r2=501727
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/AioFutureImpl.java (original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/AioFutureImpl.java Tue Jan 30 20:49:34 2007
@@ -113,7 +113,7 @@
private Future<V> future;
private Object attachment = null;
private AsynchronousFileChannel asynchronousFileChannel = null;
- private final List<AioCompletionHandler<AioFuture>> completionHandlers = new ArrayList<AioCompletionHandler<AioFuture>>();
+ private final List<AioCompletionHandler<V>> completionHandlers = new ArrayList<AioCompletionHandler<V>>();
private ExecutionException exception;
public AioFutureImpl() {
@@ -124,17 +124,17 @@
this.asynchronousFileChannel = channel;
}
- public synchronized void addCompletionHandler(AioCompletionHandler<AioFuture> completionHandler) {
+ public synchronized void addCompletionHandler(AioCompletionHandler<V> completionHandler) {
completionHandlers.add(completionHandler);
}
- public synchronized boolean removeCompletionHandler(AioCompletionHandler<AioFuture> completionHandler) {
+ public synchronized boolean removeCompletionHandler(AioCompletionHandler<V> completionHandler) {
return completionHandlers.remove(completionHandler);
}
public synchronized void callCompletionHandlers() {
try {
- for (AioCompletionHandler<AioFuture> completionHandler : completionHandlers) {
+ for (AioCompletionHandler<V> completionHandler : completionHandlers) {
completionHandler.onCompletion(this);
}
} catch (Throwable e) {
Modified: mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/ConcurrentAsynchronousFileChannel.java
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/ConcurrentAsynchronousFileChannel.java?view=diff&rev=501727&r1=501726&r2=501727
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/ConcurrentAsynchronousFileChannel.java (original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/ConcurrentAsynchronousFileChannel.java Tue Jan 30 20:49:34 2007
@@ -58,6 +58,12 @@
@Override
public AioFuture<FileLock> lock(final long position, final long size, final boolean shared) throws AioException {
+ if (position < 0) {
+ throw new IllegalArgumentException("position can not be negative");
+ }
+ if (size < 0) {
+ throw new IllegalArgumentException("size can not be negative");
+ }
final AioFutureImpl<FileLock> aioFuture = new AioFutureImpl<FileLock>(this);
Future<FileLock> future = executorService.submit(new Callable<FileLock>() {
public FileLock call() throws Exception {
@@ -73,6 +79,9 @@
@Override
public ByteBufferFuture read(final ByteBuffer buffer, final long position) throws AioException {
+ if (position < 0) {
+ throw new IllegalArgumentException("position can not be negative");
+ }
final ByteBufferFutureImpl byteBufferFuture = new ByteBufferFutureImpl(buffer, position);
Future<Integer> future = executorService.submit(new Callable<Integer>() {
public Integer call() throws Exception {
@@ -87,6 +96,12 @@
@Override
public BatchFuture read(final ByteBufferPosition[] byteBufferPositions, final int offset, final int length) throws AioException {
+ if (offset < 0) {
+ throw new IllegalArgumentException("offset can not be null");
+ }
+ if (length < 0) {
+ throw new IllegalArgumentException("length can not be null");
+ }
final BatchFutureImpl batchFuture = new BatchFutureImpl(byteBufferPositions, offset, length);
Future<Long> future = executorService.submit(new Callable<Long>() {
public Long call() throws Exception {
@@ -128,6 +143,9 @@
@Override
public AioFuture<Void> truncate(final long size) throws AioException {
+ if (size < 0) {
+ throw new IllegalArgumentException("size can not be negative");
+ }
final AioFutureImpl<Void> aioFuture = new AioFutureImpl<Void>();
Future<Void> future = executorService.submit(new Callable<Void>() {
public Void call() throws Exception {
@@ -143,7 +161,7 @@
@Override
public FileLock tryLock(long position, long size, boolean shared) throws AioException {
try {
- return channel.tryLock();
+ return channel.tryLock(position, size, shared);
} catch (IOException e) {
throw new AioException(e);
}
@@ -151,6 +169,9 @@
@Override
public ByteBufferFuture write(final ByteBuffer buffer, final long position) {
+ if (position < 0) {
+ throw new IllegalArgumentException("position can not be null");
+ }
final ByteBufferFutureImpl byteBufferFuture = new ByteBufferFutureImpl(buffer, position);
Future<Integer> future = executorService.submit(new Callable<Integer>() {
public Integer call() throws Exception {
@@ -165,6 +186,12 @@
@Override
public BatchFuture write(final ByteBufferPosition[] byteBufferPositions, final int offset, final int length) {
+ if (offset < 0) {
+ throw new IllegalArgumentException("offset can not be negative");
+ }
+ if (length < 0) {
+ throw new IllegalArgumentException("length can not be negative");
+ }
final BatchFutureImpl batchFuture = new BatchFutureImpl(byteBufferPositions, offset, length);
Future<Long> future = executorService.submit(new Callable<Long>() {
public Long call() throws Exception {
Modified: mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/concurrent/TestOpen.java
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/concurrent/TestOpen.java?view=diff&rev=501727&r1=501726&r2=501727
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/concurrent/TestOpen.java (original)
+++ mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/concurrent/TestOpen.java Tue Jan 30 20:49:34 2007
@@ -57,7 +57,7 @@
final Semaphore sem = new Semaphore(callbackCount);
sem.acquire(callbackCount);
for (int i = 0; i < callbackCount; i++) {
- future.addCompletionHandler(new AioCompletionHandler<AioFuture>() {
+ future.addCompletionHandler(new AioCompletionHandler<AsynchronousFileChannel>() {
public void onCompletion(AioFuture future) {
sem.release();
}
Added: mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/concurrent/TestReads.java
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/concurrent/TestReads.java?view=auto&rev=501727
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/concurrent/TestReads.java (added)
+++ mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/concurrent/TestReads.java Tue Jan 30 20:49:34 2007
@@ -0,0 +1,74 @@
+package org.apache.aio.concurrent;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.aio.AioCompletionHandler;
+import org.apache.aio.AioFuture;
+import org.apache.aio.AsynchronousFileChannel;
+import org.apache.aio.AsynchronousFileChannelFactory;
+import org.apache.aio.Modes;
+import org.apache.aio.AioFuture.ByteBufferFuture;
+import org.testng.annotations.Test;
+
+public class TestReads {
+
+ private void writeToFile(File file, int count) throws IOException {
+ FileOutputStream out = new FileOutputStream(file);
+ FileChannel channel = out.getChannel();
+ ByteBuffer buffer = ByteBuffer.allocate(count * 4);
+ for (int i = 0; i < count; i++) {
+ buffer.putInt(i);
+ }
+ buffer.flip();
+ channel.write(buffer);
+ out.close();
+ }
+
+ protected void checkBuffer(ByteBuffer buffer, int start, int length) {
+ for (int i = 0; i < length; i++) {
+ assert (start + i) == buffer.getInt();
+ }
+ }
+
+ @Test(groups={"concurrent", "read", "callback"})
+ public void simpleRead() throws Exception {
+ File file = File.createTempFile("temp", "aio");
+
+ // Write data to file
+ final int intCount = 5;
+ writeToFile(file, intCount);
+
+ AioFuture<AsynchronousFileChannel> future = AsynchronousFileChannelFactory.open(file, Modes.READ_ONLY);
+ AsynchronousFileChannel channel = future.get();
+
+ ByteBuffer buffer = ByteBuffer.allocate(intCount * 4);
+
+ final Semaphore sem = new Semaphore(1);
+ final AtomicBoolean completed = new AtomicBoolean(false); // Create AtomicBoolean to use at mutable boolean
+ sem.acquire();
+ AioFuture<Integer> readFuture = channel.read(buffer, 0);
+ readFuture.addCompletionHandler(new AioCompletionHandler<Integer>() {
+ public void onCompletion(AioFuture<Integer> future) {
+ ByteBufferFuture byteBufferFuture = (ByteBufferFuture)future;
+ ByteBuffer buffer = byteBufferFuture.getByteBuffer();
+ assert buffer.position() == buffer.limit();
+
+ buffer.flip();
+ checkBuffer(buffer, 0, intCount);
+
+ completed.set(true);
+ sem.release();
+ }
+ });
+ sem.acquire(); // Wait until completion handler completes
+
+ assert completed.get();
+ }
+
+}