You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by mh...@apache.org on 2007/04/23 18:15:42 UTC

svn commit: r531521 - in /mina/sandbox/mheath/aioj/trunk: ./ src/main/c/ src/main/java/org/apache/aio/concurrent/ src/main/java/org/apache/aio/posix/ src/test/java/org/apache/aio/concurrent/ src/test/java/org/apache/aio/posix/

Author: mheath
Date: Mon Apr 23 09:15:40 2007
New Revision: 531521

URL: http://svn.apache.org/viewvc?view=rev&rev=531521
Log:
Fixed a number of issues with signal callbacks.

Removed:
    mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/AbstractPosixAioFuture.java
Modified:
    mina/sandbox/mheath/aioj/trunk/pom.xml
    mina/sandbox/mheath/aioj/trunk/src/main/c/Makefile
    mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp
    mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/AbstractAioFutureCallBackHandler.java
    mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixAsynchronousFileChannel.java
    mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/concurrent/TestOpen.java
    mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/posix/TestPosixFileChannel.java

Modified: mina/sandbox/mheath/aioj/trunk/pom.xml
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/pom.xml?view=diff&rev=531521&r1=531520&r2=531521
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/pom.xml (original)
+++ mina/sandbox/mheath/aioj/trunk/pom.xml Mon Apr 23 09:15:40 2007
@@ -113,6 +113,13 @@
 			<classifier>jdk15</classifier>
 			<scope>test</scope>
 		</dependency>
+		
+		<dependency>
+			<groupId>commons-math</groupId>
+			<artifactId>commons-math</artifactId>
+			<version>1.1</version>
+			<scope>test</scope>
+		</dependency>
 	</dependencies>
 
 </project>

Modified: mina/sandbox/mheath/aioj/trunk/src/main/c/Makefile
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/c/Makefile?view=diff&rev=531521&r1=531520&r2=531521
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/c/Makefile (original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/c/Makefile Mon Apr 23 09:15:40 2007
@@ -13,6 +13,7 @@
 JAVA_BUILD_DIR := $(TARGET_DIR)/classes
 
 JAVA_HOME ?= /opt/java/jdk
+JVM_SHARED_LIB := $(JAVA_HOME)/jre/lib/i386/server
 
 INCLUDES := -I $(JAVA_HOME)/include -I $(JAVA_HOME)/include/linux -I $(WORKING_DIR)
 
@@ -26,7 +27,7 @@
 all: $(TARGET)
 
 $(TARGET): org_apache_aio.cpp $(WORKING_DIR)/org_apache_aio_test_Test.h $(WORKING_DIR)/org_apache_aio_posix_PosixAsynchronousFileChannel.h $(WORKING_DIR)/org_apache_aio_linux_LinuxAsynchronousFileChannel.h $(WORKING_DIR)/org_apache_aio_posix_PosixAioFutureImpl.h
-	g++ -shared -lrt -lstdc++ $(INCLUDES) org_apache_aio.cpp -o $(TARGET)
+	g++ -O2 -shared -L$(JVM_SHARED_LIB) -ljsig -ljvm -lrt -lstdc++ $(INCLUDES) org_apache_aio.cpp -o $(TARGET)
 
 $(WORKING_DIR)/org_apache_aio_test_Test.h: $(TARGET_DIR)/classes/org/apache/aio/test/Test.class
 	mkdir -p $(TARGET_DIR)/jni

Modified: mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp?view=diff&rev=531521&r1=531520&r2=531521
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp (original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/c/org_apache_aio.cpp Mon Apr 23 09:15:40 2007
@@ -33,7 +33,7 @@
 // The signal number to use for AIO callbacks
 #define SIG_AIO (SIGRTMIN + 5)
 
-#define DEBUG
+//#define DEBUG
 
 #ifdef DEBUG
 #define LOG_DEBUG(s) { fprintf(stdout, s); fflush(stdout); }
@@ -329,13 +329,19 @@
 		LOG_DEBUG("In signal handler\n");
 		
 		JNIEnv *env;
-		jvm->GetEnv((void**)&env, JNI_VERSION);
+		jint retVal = jvm->GetEnv((void**)&env, JNI_VERSION);
 		if (env == NULL) {
-			perror("Unable to obtain reference to JNI Environment, can not handle AIO completion\n");
-		} else {
-			struct aio_request *req = (struct aio_request *)info->si_value.sival_ptr;
-			processPosixAioCallback(env, req);
+			switch(retVal) {
+				case JNI_EDETACHED:
+					perror("The current thread is detached.  Unable to obtain reference to JNI Environment.  Can not complete AIO operation.");
+					return;
+				case JNI_EVERSION:
+					perror("Java version error");
+					return;
+			}
 		}
+		struct aio_request *req = (struct aio_request *)info->si_value.sival_ptr;
+		processPosixAioCallback(env, req);
 	}
 }
 

Modified: mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/AbstractAioFutureCallBackHandler.java
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/AbstractAioFutureCallBackHandler.java?view=diff&rev=531521&r1=531520&r2=531521
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/AbstractAioFutureCallBackHandler.java (original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/concurrent/AbstractAioFutureCallBackHandler.java Mon Apr 23 09:15:40 2007
@@ -16,7 +16,12 @@
     }
 
     public synchronized void addCompletionHandler(AioCompletionHandler<V> completionHandler) {
-        completionHandlers.add(completionHandler);
+        if (isDone()) {
+            //  TODO: Does this need to be executed in a Executor? 
+            completionHandler.onCompletion(this);
+        } else {
+            completionHandlers.add(completionHandler);
+        }
     }
 
     public synchronized boolean removeCompletionHandler(AioCompletionHandler<V> completionHandler) {

Modified: mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixAsynchronousFileChannel.java
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixAsynchronousFileChannel.java?view=diff&rev=531521&r1=531520&r2=531521
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixAsynchronousFileChannel.java (original)
+++ mina/sandbox/mheath/aioj/trunk/src/main/java/org/apache/aio/posix/PosixAsynchronousFileChannel.java Mon Apr 23 09:15:40 2007
@@ -20,6 +20,10 @@
     @SuppressWarnings("unused")
     private final FileDescriptor fd;
     
+    static {
+        System.loadLibrary("aioj");
+    }
+    
     public PosixAsynchronousFileChannel(FileDescriptor fd, FileChannel channel, Modes mode, ExecutorService executorService, boolean threadNotify) {
         super(channel, mode, executorService);
         this.fd = fd;

Modified: mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/concurrent/TestOpen.java
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/concurrent/TestOpen.java?view=diff&rev=531521&r1=531520&r2=531521
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/concurrent/TestOpen.java (original)
+++ mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/concurrent/TestOpen.java Mon Apr 23 09:15:40 2007
@@ -2,14 +2,18 @@
 
 import java.io.File;
 import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Semaphore;
 
+import org.apache.aio.AioCallbackException;
 import org.apache.aio.AioCompletionHandler;
 import org.apache.aio.AioFuture;
 import org.apache.aio.AsynchronousFileChannel;
 import org.apache.aio.AsynchronousFileChannelFactory;
 import org.apache.aio.Modes;
+import org.apache.aio.AioFuture.ByteBufferFuture;
 import org.testng.AssertJUnit;
 import org.testng.annotations.Test;
 
@@ -37,7 +41,7 @@
         channel.close();
         assert !channel.isOpen();
     }
-    
+
     @Test(groups={"concurrent", "open"})
     public void openFileReadWrite() throws Exception {
         File testFile = File.createTempFile("aio", "test");
@@ -47,11 +51,11 @@
         assert channel.isWriteable();
         channel.close();
     }
-    
+
     @Test(timeOut=5000, groups={"concurrent", "open", "callback"})
     public void openCallback() throws Exception {
         final int callbackCount = 3;
-        
+
         File testFile = File.createTempFile("aio", "test");
         AioFuture<AsynchronousFileChannel> future = AsynchronousFileChannelFactory.open(testFile, Modes.READ_WRITE);
         final Semaphore sem = new Semaphore(callbackCount);
@@ -71,5 +75,23 @@
             throw e;
         }
     }
-    
+
+    public void foo() throws IOException, AioCallbackException, InterruptedException, ExecutionException {
+        File testFile = File.createTempFile("aio", "test");
+        AioFuture<AsynchronousFileChannel> future = AsynchronousFileChannelFactory.open(testFile, Modes.READ_WRITE);
+        AsynchronousFileChannel channel = future.get();
+
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+        ByteBufferFuture readFuture = channel.read(buffer, 0L);
+        readFuture.addCompletionHandler(new AioCompletionHandler<Integer>() {
+            public void onCompletion(AioFuture<Integer> future) {
+                try {
+                    System.out.printf("Read %d bytes\n", future.get());
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            } 
+        });
+    }
+
 }

Modified: mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/posix/TestPosixFileChannel.java
URL: http://svn.apache.org/viewvc/mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/posix/TestPosixFileChannel.java?view=diff&rev=531521&r1=531520&r2=531521
==============================================================================
--- mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/posix/TestPosixFileChannel.java (original)
+++ mina/sandbox/mheath/aioj/trunk/src/test/java/org/apache/aio/posix/TestPosixFileChannel.java Mon Apr 23 09:15:40 2007
@@ -4,6 +4,9 @@
 import java.io.FileOutputStream;
 import java.nio.ByteBuffer;
 import java.util.concurrent.Executors;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.aio.AioCompletionHandler;
 import org.apache.aio.AioFuture;
@@ -17,51 +20,74 @@
     }
     
     public static void main(String[] args) throws Exception {
-        ByteBuffer buffer = ByteBuffer.allocateDirect(1000);
+        ByteBuffer buffer = ByteBuffer.allocate(1000);
 
-//        FileInputStream in = new FileInputStream("/etc/passwd");
+        FileInputStream in = new FileInputStream("/home/mheath/bigfile");
+        PosixAsynchronousFileChannel channel = new PosixAsynchronousFileChannel(
+                in.getFD(),
+                in.getChannel(),
+                Modes.READ_ONLY,
+                new ThreadPoolExecutor(0, Integer.MAX_VALUE,
+                        2L, TimeUnit.SECONDS,
+                        new SynchronousQueue<Runnable>()), false);
+        final AioFuture<Integer> future = channel.read(buffer, 10L);
+        future.addCompletionHandler(new AioCompletionHandler<Integer>() {
+            public void onCompletion(AioFuture<Integer> future) {
+                try {
+                    System.out.println(String.format("Completion handler read %d bytes", future.get()));
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+        future.get();
+        System.out.println(buffer.position());
+        buffer.flip();
+        byte[] bytes = new byte[buffer.remaining()];
+        buffer.get(bytes);
+//        System.out.println(new String(bytes));
+
+        channel.close();
+
+        channel.getExecutorService().submit(new Runnable() {
+            public void run() {
+                try {
+                    Thread.sleep(100);
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+                synchronized (future) {
+                    future.notifyAll();
+                }
+            }
+        });
+        synchronized (future) {
+            future.wait();
+            System.out.println("Done waiting");
+        }
+        
+        System.out.println("Done");
+        
+        // Test output
+//        FileOutputStream out = new FileOutputStream("/tmp/aiotest");
 //        PosixAsynchronousFileChannel channel = new PosixAsynchronousFileChannel(
-//                in.getFD(),
-//                in.getChannel(),
-//                Modes.READ_ONLY,
+//                out.getFD(),
+//                out.getChannel(),
+//                Modes.READ_WRITE,
 //                Executors.newCachedThreadPool(), true);
-//        AioFuture<Integer> future = channel.read(buffer, 0L);
-//        future.addCompletionHandler(new AioCompletionHandler<Integer>() {
-//            public void onCompletion(AioFuture<Integer> future) {
-//                try {
-//                    System.out.println(String.format("Completion handler read %d bytes", future.get()));
-//                } catch (Exception e) {
-//                    e.printStackTrace();
-//                }
-//            }
-//        });
-//        future.get();
-//        System.out.println(buffer.position());
+//        
+//        buffer.put("Have a nice day.  AIO is way cool.  Let's try a heap buffer.\n".getBytes());
 //        buffer.flip();
-//        byte[] bytes = new byte[buffer.remaining()];
-//        buffer.get(bytes);
-//        System.out.println(new String(bytes));
 //
+//        ByteBufferFuture writeFuture = channel.write(buffer, 0);
+//        System.out.println("Done with write");
+//        writeFuture.get();
+//        
+//        System.out.println(buffer.position());
+//        
 //        channel.close();
 //        
-        // Test output
-        FileOutputStream out = new FileOutputStream("/tmp/aiotest");
-        PosixAsynchronousFileChannel channel = new PosixAsynchronousFileChannel(
-                out.getFD(),
-                out.getChannel(),
-                Modes.READ_WRITE,
-                Executors.newCachedThreadPool(), true);
-        
-        buffer.put("Have a nice day!\n".getBytes());
-        buffer.flip();
-
-        ByteBufferFuture writeFuture = channel.write(buffer, 0);
-        System.out.println("Done with write");
-        writeFuture.get();
-        
-        channel.close();
-        
-        System.out.println("Done!");
+//        System.out.println("Done!");
     }
 
 }