You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by mh...@apache.org on 2007/01/31 05:49:35 UTC

svn commit: r501727 - in /mina/sandbox/mheath/aioj/trunk/src: main/java/org/apache/aio/ main/java/org/apache/aio/concurrent/ test/java/org/apache/aio/concurrent/

Author: mheath
Date: Tue Jan 30 20:49:34 2007
New Revision: 501727

URL: http://svn.apache.org/viewvc?view=rev&rev=501727
Log:
Added read test.
Made some API changes.

Added:
    mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/concurrent/TestReads.java
Modified:
    mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioCompletionHandler.java
    mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java
    mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/AioFutureImpl.java
    mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/ConcurrentAsynchronousFileChannel.java
    mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/concurrent/TestOpen.java

Modified: mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioCompletionHandler.java
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioCompletionHandler.java?view=diff&rev=501727&r1=501726&r2=501727
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioCompletionHandler.java (original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioCompletionHandler.java Tue Jan 30 20:49:34 2007
@@ -29,7 +29,7 @@
  *
  * @param <T> The type of future object to be processed.
  */
-public interface AioCompletionHandler<T extends AioFuture> {
+public interface AioCompletionHandler<T> {
 
     /**
      * The method invoked when the operation completes.
@@ -39,6 +39,6 @@
      * 
      * @param future  The future object representing the file channel operation.
      */
-    void onCompletion(T future);
+    void onCompletion(AioFuture<T> future);
 	
 }

Modified: mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java?view=diff&rev=501727&r1=501726&r2=501727
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java (original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/AioFuture.java Tue Jan 30 20:49:34 2007
@@ -39,22 +39,14 @@
  */
 public interface AioFuture<V> extends Future<V> {
     
-    static interface BatchFuture extends AioFuture<Long> {
+    public static interface BatchFuture extends AioFuture<Long> {
         ByteBufferPosition[] getBatch();
         ByteBufferPosition[] getUsedBatch();
         int getOffset();
         int getLength();
     }
 
-    static interface ByteBufferFuture extends AioFuture<Integer>, ByteBufferPosition {}
-    
-    //static interface LockFuture extends AioFuture<FileLock, LockFuture> {}
-    
-    //static interface OpenFuture extends AioFuture<AsynchronousFileChannel, OpenFuture> {}
-    
-    //static interface SyncFuture extends AioFuture<Void, SyncFuture> {}
-    
-    //static interface TruncateFuture extends AioFuture<Void, TruncateFuture> {}
+    public static interface ByteBufferFuture extends AioFuture<Integer>, ByteBufferPosition {}
     
     /**
      * Associates a {@link AioCompletionHandler} with this future.  The completion handler will be invoked when
@@ -62,7 +54,7 @@
      * 
      * @param completionHandler  The completion handler to be invoked.
      */
-    void addCompletionHandler(AioCompletionHandler<AioFuture> completionHandler);
+    void addCompletionHandler(AioCompletionHandler<V> completionHandler);
 
     /**
      * Removes a {@link AioCompletionHandler} from this future.
@@ -70,7 +62,7 @@
      * @param completionHandler  The completion handler to be disassociated with this future
      * @return  Returns true if the completion handler was disassociated with this future, false otherwise.
      */
-    boolean removeCompletionHandler(AioCompletionHandler<AioFuture> completionHandler);
+    boolean removeCompletionHandler(AioCompletionHandler<V> completionHandler);
 
     /**
      * Returns the {@link AsynchronousFileChannel} where the operation represnted by this future orginated.

Modified: mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/AioFutureImpl.java
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/AioFutureImpl.java?view=diff&rev=501727&r1=501726&r2=501727
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/AioFutureImpl.java (original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/AioFutureImpl.java Tue Jan 30 20:49:34 2007
@@ -113,7 +113,7 @@
     private Future<V> future;
     private Object attachment = null;
     private AsynchronousFileChannel asynchronousFileChannel = null;
-    private final List<AioCompletionHandler<AioFuture>> completionHandlers = new ArrayList<AioCompletionHandler<AioFuture>>();
+    private final List<AioCompletionHandler<V>> completionHandlers = new ArrayList<AioCompletionHandler<V>>();
     private ExecutionException exception;
 
     public AioFutureImpl() {
@@ -124,17 +124,17 @@
         this.asynchronousFileChannel = channel;
     }
 
-    public synchronized void addCompletionHandler(AioCompletionHandler<AioFuture> completionHandler) {
+    public synchronized void addCompletionHandler(AioCompletionHandler<V> completionHandler) {
         completionHandlers.add(completionHandler);
     }
 
-    public synchronized boolean removeCompletionHandler(AioCompletionHandler<AioFuture> completionHandler) {
+    public synchronized boolean removeCompletionHandler(AioCompletionHandler<V> completionHandler) {
         return completionHandlers.remove(completionHandler);
     }
     
     public synchronized void callCompletionHandlers() {
         try {
-            for (AioCompletionHandler<AioFuture> completionHandler : completionHandlers) {
+            for (AioCompletionHandler<V> completionHandler : completionHandlers) {
                 completionHandler.onCompletion(this);
             }
         } catch (Throwable e) {

Modified: mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/ConcurrentAsynchronousFileChannel.java
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/ConcurrentAsynchronousFileChannel.java?view=diff&rev=501727&r1=501726&r2=501727
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/ConcurrentAsynchronousFileChannel.java (original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/ConcurrentAsynchronousFileChannel.java Tue Jan 30 20:49:34 2007
@@ -58,6 +58,12 @@
 
     @Override
     public AioFuture<FileLock> lock(final long position, final long size, final boolean shared) throws AioException {
+        if (position < 0) {
+            throw new IllegalArgumentException("position can not be negative");
+        }
+        if (size < 0) {
+            throw new IllegalArgumentException("size can not be negative");
+        }
         final AioFutureImpl<FileLock> aioFuture = new AioFutureImpl<FileLock>(this);
         Future<FileLock> future = executorService.submit(new Callable<FileLock>() {
            public FileLock call() throws Exception {
@@ -73,6 +79,9 @@
 
     @Override
     public ByteBufferFuture read(final ByteBuffer buffer, final long position) throws AioException {
+        if (position < 0) {
+            throw new IllegalArgumentException("position can not be negative");
+        }
         final ByteBufferFutureImpl byteBufferFuture = new ByteBufferFutureImpl(buffer, position);
         Future<Integer> future = executorService.submit(new Callable<Integer>() {
            public Integer call() throws Exception {
@@ -87,6 +96,12 @@
 
     @Override
     public BatchFuture read(final ByteBufferPosition[] byteBufferPositions, final int offset, final int length) throws AioException {
+        if (offset < 0) {
+            throw new IllegalArgumentException("offset can not be null");
+        }
+        if (length < 0) {
+            throw new IllegalArgumentException("length can not be null");
+        }
         final BatchFutureImpl batchFuture = new BatchFutureImpl(byteBufferPositions, offset, length);
         Future<Long> future = executorService.submit(new Callable<Long>() {
            public Long call() throws Exception {
@@ -128,6 +143,9 @@
 
     @Override
     public AioFuture<Void> truncate(final long size) throws AioException {
+        if (size < 0) {
+            throw new IllegalArgumentException("size can not be negative");
+        }
         final AioFutureImpl<Void> aioFuture = new AioFutureImpl<Void>();
         Future<Void> future = executorService.submit(new Callable<Void>() {
             public Void call() throws Exception {
@@ -143,7 +161,7 @@
     @Override
     public FileLock tryLock(long position, long size, boolean shared) throws AioException {
         try {
-            return channel.tryLock();
+            return channel.tryLock(position, size, shared);
         } catch (IOException e) {
             throw new AioException(e);
         }
@@ -151,6 +169,9 @@
 
     @Override
     public ByteBufferFuture write(final ByteBuffer buffer, final long position) {
+        if (position < 0) {
+            throw new IllegalArgumentException("position can not be null");
+        }
         final ByteBufferFutureImpl byteBufferFuture = new ByteBufferFutureImpl(buffer, position);
         Future<Integer> future = executorService.submit(new Callable<Integer>() {
            public Integer call() throws Exception {
@@ -165,6 +186,12 @@
 
     @Override
     public BatchFuture write(final ByteBufferPosition[] byteBufferPositions, final int offset, final int length) {
+        if (offset < 0) {
+            throw new IllegalArgumentException("offset can not be negative");
+        }
+        if (length < 0) {
+            throw new IllegalArgumentException("length can not be negative");
+        }
         final BatchFutureImpl batchFuture = new BatchFutureImpl(byteBufferPositions, offset, length);
         Future<Long> future = executorService.submit(new Callable<Long>() {
            public Long call() throws Exception {

Modified: mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/concurrent/TestOpen.java
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/concurrent/TestOpen.java?view=diff&rev=501727&r1=501726&r2=501727
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/concurrent/TestOpen.java (original)
+++ mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/concurrent/TestOpen.java Tue Jan 30 20:49:34 2007
@@ -57,7 +57,7 @@
         final Semaphore sem = new Semaphore(callbackCount);
         sem.acquire(callbackCount);
         for (int i = 0; i < callbackCount; i++) {
-            future.addCompletionHandler(new AioCompletionHandler<AioFuture>() {
+            future.addCompletionHandler(new AioCompletionHandler<AsynchronousFileChannel>() {
                 public void onCompletion(AioFuture future) {
                     sem.release();
                 }

Added: mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/concurrent/TestReads.java
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/concurrent/TestReads.java?view=auto&rev=501727
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/concurrent/TestReads.java (added)
+++ mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/concurrent/TestReads.java Tue Jan 30 20:49:34 2007
@@ -0,0 +1,74 @@
+package org.apache.aio.concurrent;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.aio.AioCompletionHandler;
+import org.apache.aio.AioFuture;
+import org.apache.aio.AsynchronousFileChannel;
+import org.apache.aio.AsynchronousFileChannelFactory;
+import org.apache.aio.Modes;
+import org.apache.aio.AioFuture.ByteBufferFuture;
+import org.testng.annotations.Test;
+
+public class TestReads {
+
+    private void writeToFile(File file, int count) throws IOException {
+        FileOutputStream out = new FileOutputStream(file);
+        FileChannel channel = out.getChannel();
+        ByteBuffer buffer = ByteBuffer.allocate(count * 4);
+        for (int i = 0; i < count; i++) {
+            buffer.putInt(i);
+        }
+        buffer.flip();
+        channel.write(buffer);
+        out.close();
+    }
+    
+    protected void checkBuffer(ByteBuffer buffer, int start, int length) {
+        for (int i = 0; i < length; i++) {
+            assert (start + i) == buffer.getInt();
+        }
+    }
+    
+    @Test(groups={"concurrent", "read", "callback"})
+    public void simpleRead() throws Exception {
+        File file = File.createTempFile("temp", "aio");
+        
+        // Write data to file
+        final int intCount = 5;
+        writeToFile(file, intCount);
+        
+        AioFuture<AsynchronousFileChannel> future = AsynchronousFileChannelFactory.open(file, Modes.READ_ONLY);
+        AsynchronousFileChannel channel = future.get();
+        
+        ByteBuffer buffer = ByteBuffer.allocate(intCount * 4);
+        
+        final Semaphore sem = new Semaphore(1);
+        final AtomicBoolean completed = new AtomicBoolean(false); // Create AtomicBoolean to use at mutable boolean
+        sem.acquire();
+        AioFuture<Integer> readFuture = channel.read(buffer, 0);
+        readFuture.addCompletionHandler(new AioCompletionHandler<Integer>() {
+            public void onCompletion(AioFuture<Integer> future) {
+                ByteBufferFuture byteBufferFuture = (ByteBufferFuture)future;
+                ByteBuffer buffer = byteBufferFuture.getByteBuffer();
+                assert buffer.position() == buffer.limit();
+                
+                buffer.flip();
+                checkBuffer(buffer, 0, intCount);
+                
+                completed.set(true);
+                sem.release();
+            }
+        });
+        sem.acquire(); // Wait until completion handler completes
+        
+        assert completed.get();
+    }
+
+}